Repository: spark Updated Branches: refs/heads/master 5c27b0d4f -> 6193a202a
[SPARK-24978][SQL] Add spark.sql.fast.hash.aggregate.row.max.capacity to configure the capacity of fast aggregation. ## What changes were proposed in this pull request? this pr add a configuration parameter to configure the capacity of fast aggregation. Performance comparison: ``` Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Windows 7 6.1 Intel64 Family 6 Model 94 Stepping 3, GenuineIntel Aggregate w multiple keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ fasthash = default 5612 / 5882 3.7 267.6 1.0X fasthash = config 3586 / 3595 5.8 171.0 1.6X ``` ## How was this patch tested? the existed test cases. Closes #21931 from heary-cao/FastHashCapacity. Authored-by: caoxuewen <cao.xue...@zte.com.cn> Signed-off-by: Wenchen Fan <wenc...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6193a202 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6193a202 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6193a202 Branch: refs/heads/master Commit: 6193a202aab0271b4532ee4b740318290f2c44a1 Parents: 5c27b0d Author: caoxuewen <cao.xue...@zte.com.cn> Authored: Mon Aug 27 15:45:48 2018 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Mon Aug 27 15:45:48 2018 +0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 13 +++++++++++++ .../sql/execution/aggregate/HashAggregateExec.scala | 5 +++-- .../execution/aggregate/RowBasedHashMapGenerator.scala | 5 +++-- .../aggregate/VectorizedHashMapGenerator.scala | 5 +++-- 4 files changed, 22 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/6193a202/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index ef3ce98..6336e89 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1485,6 +1485,17 @@ object SQLConf { .intConf .createWithDefault(20) + val FAST_HASH_AGGREGATE_MAX_ROWS_CAPACITY_BIT = + buildConf("spark.sql.codegen.aggregate.fastHashMap.capacityBit") + .internal() + .doc("Capacity for the max number of rows to be held in memory " + + "by the fast hash aggregate product operator. The bit is not for actual value, " + + "but the actual numBuckets is determined by loadFactor " + + "(e.g: default bit value 16 , the actual numBuckets is ((1 << 16) / 0.5).") + .intConf + .checkValue(bit => bit >= 10 && bit <= 30, "The bit value must be in [10, 30].") + .createWithDefault(16) + val AVRO_COMPRESSION_CODEC = buildConf("spark.sql.avro.compression.codec") .doc("Compression codec used in writing of AVRO files. Supported codecs: " + "uncompressed, deflate, snappy, bzip2 and xz. Default codec is snappy.") @@ -1703,6 +1714,8 @@ class SQLConf extends Serializable with Logging { def topKSortFallbackThreshold: Int = getConf(TOP_K_SORT_FALLBACK_THRESHOLD) + def fastHashAggregateRowMaxCapacityBit: Int = getConf(FAST_HASH_AGGREGATE_MAX_ROWS_CAPACITY_BIT) + /** * Returns the [[Resolver]] for the current configuration, which can be used to determine if two * identifiers are equal. http://git-wip-us.apache.org/repos/asf/spark/blob/6193a202/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index 2cac0cf..98adba5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -579,6 +579,7 @@ case class HashAggregateExec( case _ => } } + val bitMaxCapacity = sqlContext.conf.fastHashAggregateRowMaxCapacityBit val thisPlan = ctx.addReferenceObj("plan", this) @@ -588,7 +589,7 @@ case class HashAggregateExec( val fastHashMapClassName = ctx.freshName("FastHashMap") if (isVectorizedHashMapEnabled) { val generatedMap = new VectorizedHashMapGenerator(ctx, aggregateExpressions, - fastHashMapClassName, groupingKeySchema, bufferSchema).generate() + fastHashMapClassName, groupingKeySchema, bufferSchema, bitMaxCapacity).generate() ctx.addInnerClass(generatedMap) // Inline mutable state since not many aggregation operations in a task @@ -598,7 +599,7 @@ case class HashAggregateExec( forceInline = true) } else { val generatedMap = new RowBasedHashMapGenerator(ctx, aggregateExpressions, - fastHashMapClassName, groupingKeySchema, bufferSchema).generate() + fastHashMapClassName, groupingKeySchema, bufferSchema, bitMaxCapacity).generate() ctx.addInnerClass(generatedMap) // Inline mutable state since not many aggregation operations in a task http://git-wip-us.apache.org/repos/asf/spark/blob/6193a202/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala index ca59bb1..3d2443c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala @@ -39,7 +39,8 @@ class RowBasedHashMapGenerator( aggregateExpressions: Seq[AggregateExpression], generatedClassName: String, groupingKeySchema: StructType, - bufferSchema: StructType) + bufferSchema: StructType, + bitMaxCapacity: Int) extends HashMapGenerator (ctx, aggregateExpressions, generatedClassName, groupingKeySchema, bufferSchema) { @@ -50,7 +51,7 @@ class RowBasedHashMapGenerator( s""" | private org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch batch; | private int[] buckets; - | private int capacity = 1 << 16; + | private int capacity = 1 << $bitMaxCapacity; | private double loadFactor = 0.5; | private int numBuckets = (int) (capacity / loadFactor); | private int maxSteps = 2; http://git-wip-us.apache.org/repos/asf/spark/blob/6193a202/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala index 95ebefe..f9c4ecc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala @@ -47,7 +47,8 @@ class VectorizedHashMapGenerator( aggregateExpressions: Seq[AggregateExpression], generatedClassName: String, groupingKeySchema: StructType, - bufferSchema: StructType) + bufferSchema: StructType, + bitMaxCapacity: Int) extends HashMapGenerator (ctx, aggregateExpressions, generatedClassName, groupingKeySchema, bufferSchema) { @@ -61,7 +62,7 @@ class VectorizedHashMapGenerator( | private ${classOf[ColumnarBatch].getName} batch; | private ${classOf[MutableColumnarRow].getName} aggBufferRow; | private int[] buckets; - | private int capacity = 1 << 16; + | private int capacity = 1 << $bitMaxCapacity; | private double loadFactor = 0.5; | private int numBuckets = (int) (capacity / loadFactor); | private int maxSteps = 2; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org