This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 743b73d [SPARK-26909][FOLLOWUP][SQL] use unsafeRow.hashCode() as hash value in HashAggregate 743b73d is described below commit 743b73daf7fbbb6cd0f763955ed331ac3889ba6f Author: yucai <y...@ebay.com> AuthorDate: Tue Feb 19 13:01:10 2019 +0800 [SPARK-26909][FOLLOWUP][SQL] use unsafeRow.hashCode() as hash value in HashAggregate ## What changes were proposed in this pull request? This is a followup PR for #21149. New way uses unsafeRow.hashCode() as hash value in HashAggregate. The unsafe row has [null bit set] etc., so the hash should be different from shuffle hash, and then we don't need a special seed. ## How was this patch tested? UTs. Closes #23821 from yucai/unsafe_hash. Authored-by: yucai <y...@ebay.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/execution/aggregate/HashAggregateExec.scala | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index 17cc7fd..23ae1f0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -742,6 +742,7 @@ case class HashAggregateExec( val fastRowKeys = ctx.generateExpressions( bindReferences[Expression](groupingExpressions, child.output)) val unsafeRowKeys = unsafeRowKeyCode.value + val unsafeRowKeyHash = ctx.freshName("unsafeRowKeyHash") val unsafeRowBuffer = ctx.freshName("unsafeRowAggBuffer") val fastRowBuffer = ctx.freshName("fastAggBuffer") @@ -755,13 +756,6 @@ case class HashAggregateExec( } } - // generate hash code for key - // SPARK-24076: HashAggregate uses the same hash algorithm on the same expressions - // as ShuffleExchange, it may lead to bad hash conflict when shuffle.partitions=8192*n, - // pick a different seed to avoid this conflict - val hashExpr = Murmur3Hash(groupingExpressions, 48) - val hashEval = BindReferences.bindReference(hashExpr, child.output).genCode(ctx) - val (checkFallbackForGeneratedHashMap, checkFallbackForBytesToBytesMap, resetCounter, incCounter) = if (testFallbackStartsAt.isDefined) { val countTerm = ctx.addMutableState(CodeGenerator.JAVA_INT, "fallbackCounter") @@ -777,11 +771,11 @@ case class HashAggregateExec( s""" |// generate grouping key |${unsafeRowKeyCode.code} - |${hashEval.code} + |int $unsafeRowKeyHash = ${unsafeRowKeyCode.value}.hashCode(); |if ($checkFallbackForBytesToBytesMap) { | // try to get the buffer from hash map | $unsafeRowBuffer = - | $hashMapTerm.getAggregationBufferFromUnsafeRow($unsafeRowKeys, ${hashEval.value}); + | $hashMapTerm.getAggregationBufferFromUnsafeRow($unsafeRowKeys, $unsafeRowKeyHash); |} |// Can't allocate buffer from the hash map. Spill the map and fallback to sort-based |// aggregation after processing all input rows. @@ -795,7 +789,7 @@ case class HashAggregateExec( | // the hash map had be spilled, it should have enough memory now, | // try to allocate buffer again. | $unsafeRowBuffer = $hashMapTerm.getAggregationBufferFromUnsafeRow( - | $unsafeRowKeys, ${hashEval.value}); + | $unsafeRowKeys, $unsafeRowKeyHash); | if ($unsafeRowBuffer == null) { | // failed to allocate the first page | throw new $oomeClassName("No enough memory for aggregation"); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org