Repository: spark
Updated Branches:
  refs/heads/master 5c27b0d4f -> 6193a202a


[SPARK-24978][SQL] Add spark.sql.fast.hash.aggregate.row.max.capacity to 
configure the capacity of fast aggregation.

## What changes were proposed in this pull request?

this pr add a configuration parameter to configure the capacity of fast 
aggregation.
Performance comparison:

```
 Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Windows 7 6.1
 Intel64 Family 6 Model 94 Stepping 3, GenuineIntel
 Aggregate w multiple keys:               Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
 
------------------------------------------------------------------------------------------------
 fasthash = default                            5612 / 5882          3.7         
267.6       1.0X
 fasthash = config                             3586 / 3595          5.8         
171.0       1.6X

```

## How was this patch tested?
the existed test cases.

Closes #21931 from heary-cao/FastHashCapacity.

Authored-by: caoxuewen <cao.xue...@zte.com.cn>
Signed-off-by: Wenchen Fan <wenc...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6193a202
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6193a202
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6193a202

Branch: refs/heads/master
Commit: 6193a202aab0271b4532ee4b740318290f2c44a1
Parents: 5c27b0d
Author: caoxuewen <cao.xue...@zte.com.cn>
Authored: Mon Aug 27 15:45:48 2018 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Mon Aug 27 15:45:48 2018 +0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/internal/SQLConf.scala  | 13 +++++++++++++
 .../sql/execution/aggregate/HashAggregateExec.scala    |  5 +++--
 .../execution/aggregate/RowBasedHashMapGenerator.scala |  5 +++--
 .../aggregate/VectorizedHashMapGenerator.scala         |  5 +++--
 4 files changed, 22 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6193a202/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index ef3ce98..6336e89 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1485,6 +1485,17 @@ object SQLConf {
     .intConf
     .createWithDefault(20)
 
+  val FAST_HASH_AGGREGATE_MAX_ROWS_CAPACITY_BIT =
+    buildConf("spark.sql.codegen.aggregate.fastHashMap.capacityBit")
+      .internal()
+      .doc("Capacity for the max number of rows to be held in memory " +
+        "by the fast hash aggregate product operator. The bit is not for 
actual value, " +
+        "but the actual numBuckets is determined by loadFactor " +
+        "(e.g: default bit value 16 , the actual numBuckets is ((1 << 16) / 
0.5).")
+      .intConf
+      .checkValue(bit => bit >= 10 && bit <= 30, "The bit value must be in 
[10, 30].")
+      .createWithDefault(16)
+
   val AVRO_COMPRESSION_CODEC = buildConf("spark.sql.avro.compression.codec")
     .doc("Compression codec used in writing of AVRO files. Supported codecs: " 
+
       "uncompressed, deflate, snappy, bzip2 and xz. Default codec is snappy.")
@@ -1703,6 +1714,8 @@ class SQLConf extends Serializable with Logging {
 
   def topKSortFallbackThreshold: Int = getConf(TOP_K_SORT_FALLBACK_THRESHOLD)
 
+  def fastHashAggregateRowMaxCapacityBit: Int = 
getConf(FAST_HASH_AGGREGATE_MAX_ROWS_CAPACITY_BIT)
+
   /**
    * Returns the [[Resolver]] for the current configuration, which can be used 
to determine if two
    * identifiers are equal.

http://git-wip-us.apache.org/repos/asf/spark/blob/6193a202/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
index 2cac0cf..98adba5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
@@ -579,6 +579,7 @@ case class HashAggregateExec(
         case _ =>
       }
     }
+    val bitMaxCapacity = sqlContext.conf.fastHashAggregateRowMaxCapacityBit
 
     val thisPlan = ctx.addReferenceObj("plan", this)
 
@@ -588,7 +589,7 @@ case class HashAggregateExec(
       val fastHashMapClassName = ctx.freshName("FastHashMap")
       if (isVectorizedHashMapEnabled) {
         val generatedMap = new VectorizedHashMapGenerator(ctx, 
aggregateExpressions,
-          fastHashMapClassName, groupingKeySchema, bufferSchema).generate()
+          fastHashMapClassName, groupingKeySchema, bufferSchema, 
bitMaxCapacity).generate()
         ctx.addInnerClass(generatedMap)
 
         // Inline mutable state since not many aggregation operations in a task
@@ -598,7 +599,7 @@ case class HashAggregateExec(
           forceInline = true)
       } else {
         val generatedMap = new RowBasedHashMapGenerator(ctx, 
aggregateExpressions,
-          fastHashMapClassName, groupingKeySchema, bufferSchema).generate()
+          fastHashMapClassName, groupingKeySchema, bufferSchema, 
bitMaxCapacity).generate()
         ctx.addInnerClass(generatedMap)
 
         // Inline mutable state since not many aggregation operations in a task

http://git-wip-us.apache.org/repos/asf/spark/blob/6193a202/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala
index ca59bb1..3d2443c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala
@@ -39,7 +39,8 @@ class RowBasedHashMapGenerator(
     aggregateExpressions: Seq[AggregateExpression],
     generatedClassName: String,
     groupingKeySchema: StructType,
-    bufferSchema: StructType)
+    bufferSchema: StructType,
+    bitMaxCapacity: Int)
   extends HashMapGenerator (ctx, aggregateExpressions, generatedClassName,
     groupingKeySchema, bufferSchema) {
 
@@ -50,7 +51,7 @@ class RowBasedHashMapGenerator(
     s"""
        |  private 
org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch batch;
        |  private int[] buckets;
-       |  private int capacity = 1 << 16;
+       |  private int capacity = 1 << $bitMaxCapacity;
        |  private double loadFactor = 0.5;
        |  private int numBuckets = (int) (capacity / loadFactor);
        |  private int maxSteps = 2;

http://git-wip-us.apache.org/repos/asf/spark/blob/6193a202/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala
index 95ebefe..f9c4ecc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala
@@ -47,7 +47,8 @@ class VectorizedHashMapGenerator(
     aggregateExpressions: Seq[AggregateExpression],
     generatedClassName: String,
     groupingKeySchema: StructType,
-    bufferSchema: StructType)
+    bufferSchema: StructType,
+    bitMaxCapacity: Int)
   extends HashMapGenerator (ctx, aggregateExpressions, generatedClassName,
     groupingKeySchema, bufferSchema) {
 
@@ -61,7 +62,7 @@ class VectorizedHashMapGenerator(
        |  private ${classOf[ColumnarBatch].getName} batch;
        |  private ${classOf[MutableColumnarRow].getName} aggBufferRow;
        |  private int[] buckets;
-       |  private int capacity = 1 << 16;
+       |  private int capacity = 1 << $bitMaxCapacity;
        |  private double loadFactor = 0.5;
        |  private int numBuckets = (int) (capacity / loadFactor);
        |  private int maxSteps = 2;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to