This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 743b73d  [SPARK-26909][FOLLOWUP][SQL] use unsafeRow.hashCode() as hash 
value in HashAggregate
743b73d is described below

commit 743b73daf7fbbb6cd0f763955ed331ac3889ba6f
Author: yucai <y...@ebay.com>
AuthorDate: Tue Feb 19 13:01:10 2019 +0800

    [SPARK-26909][FOLLOWUP][SQL] use unsafeRow.hashCode() as hash value in 
HashAggregate
    
    ## What changes were proposed in this pull request?
    
    This is a followup PR for #21149.
    
    New way uses unsafeRow.hashCode() as hash value in HashAggregate.
    The unsafe row has [null bit set] etc., so the hash should be different 
from shuffle hash, and then we don't need a special seed.
    
    ## How was this patch tested?
    
    UTs.
    
    Closes #23821 from yucai/unsafe_hash.
    
    Authored-by: yucai <y...@ebay.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/execution/aggregate/HashAggregateExec.scala  | 14 ++++----------
 1 file changed, 4 insertions(+), 10 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
index 17cc7fd..23ae1f0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
@@ -742,6 +742,7 @@ case class HashAggregateExec(
     val fastRowKeys = ctx.generateExpressions(
       bindReferences[Expression](groupingExpressions, child.output))
     val unsafeRowKeys = unsafeRowKeyCode.value
+    val unsafeRowKeyHash = ctx.freshName("unsafeRowKeyHash")
     val unsafeRowBuffer = ctx.freshName("unsafeRowAggBuffer")
     val fastRowBuffer = ctx.freshName("fastAggBuffer")
 
@@ -755,13 +756,6 @@ case class HashAggregateExec(
       }
     }
 
-    // generate hash code for key
-    // SPARK-24076: HashAggregate uses the same hash algorithm on the same 
expressions
-    // as ShuffleExchange, it may lead to bad hash conflict when 
shuffle.partitions=8192*n,
-    // pick a different seed to avoid this conflict
-    val hashExpr = Murmur3Hash(groupingExpressions, 48)
-    val hashEval = BindReferences.bindReference(hashExpr, 
child.output).genCode(ctx)
-
     val (checkFallbackForGeneratedHashMap, checkFallbackForBytesToBytesMap, 
resetCounter,
     incCounter) = if (testFallbackStartsAt.isDefined) {
       val countTerm = ctx.addMutableState(CodeGenerator.JAVA_INT, 
"fallbackCounter")
@@ -777,11 +771,11 @@ case class HashAggregateExec(
       s"""
          |// generate grouping key
          |${unsafeRowKeyCode.code}
-         |${hashEval.code}
+         |int $unsafeRowKeyHash = ${unsafeRowKeyCode.value}.hashCode();
          |if ($checkFallbackForBytesToBytesMap) {
          |  // try to get the buffer from hash map
          |  $unsafeRowBuffer =
-         |    $hashMapTerm.getAggregationBufferFromUnsafeRow($unsafeRowKeys, 
${hashEval.value});
+         |    $hashMapTerm.getAggregationBufferFromUnsafeRow($unsafeRowKeys, 
$unsafeRowKeyHash);
          |}
          |// Can't allocate buffer from the hash map. Spill the map and 
fallback to sort-based
          |// aggregation after processing all input rows.
@@ -795,7 +789,7 @@ case class HashAggregateExec(
          |  // the hash map had be spilled, it should have enough memory now,
          |  // try to allocate buffer again.
          |  $unsafeRowBuffer = $hashMapTerm.getAggregationBufferFromUnsafeRow(
-         |    $unsafeRowKeys, ${hashEval.value});
+         |    $unsafeRowKeys, $unsafeRowKeyHash);
          |  if ($unsafeRowBuffer == null) {
          |    // failed to allocate the first page
          |    throw new $oomeClassName("No enough memory for aggregation");


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to