This is an automated email from the ASF dual-hosted git repository.

marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 4c3fbb8e3 Fix shuffle with round robin partitioning fail (#5928)
4c3fbb8e3 is described below

commit 4c3fbb8e369de480ba31b6385eb35c6d8c3852be
Author: Xiduo You <[email protected]>
AuthorDate: Fri May 31 09:19:25 2024 +0800

    Fix shuffle with round robin partitioning fail (#5928)
---
 .../backendsapi/velox/VeloxSparkPlanExecApi.scala  | 48 ++++++++++++++--------
 .../org/apache/gluten/execution/TestOperator.scala | 23 +++++++++++
 2 files changed, 55 insertions(+), 16 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index 92be63a58..58b27e8a7 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -347,23 +347,39 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
         // See 
https://github.com/apache/spark/blob/609bd4839e5d504917de74ed1cb9c23645fba51f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala#L279-L283
         // scalastyle:on line.size.limit
         allowHashOnMap {
-          val hashExpr = new Murmur3Hash(newChild.output)
-          val projectList = Seq(Alias(hashExpr, "hash_partition_key")()) ++ 
newChild.output
-          val projectTransformer = ProjectExecTransformer(projectList, 
newChild)
-          val sortOrder = SortOrder(projectTransformer.output.head, Ascending)
-          val sortByHashCode =
-            SortExecTransformer(Seq(sortOrder), global = false, 
projectTransformer)
-          val dropSortColumnTransformer =
-            ProjectExecTransformer(projectList.drop(1), sortByHashCode)
-          val validationResult = dropSortColumnTransformer.doValidate()
-          if (validationResult.isValid) {
-            ColumnarShuffleExchangeExec(
-              shuffle,
-              dropSortColumnTransformer,
-              dropSortColumnTransformer.output)
+          // Velox hash expression does not support null type and we also do 
not need to sort
+          // null type since the value always be null.
+          val columnsForHash = newChild.output.filterNot(_.dataType == 
NullType)
+          if (columnsForHash.isEmpty) {
+            ColumnarShuffleExchangeExec(shuffle, newChild, newChild.output)
           } else {
-            TransformHints.tagNotTransformable(shuffle, validationResult)
-            shuffle.withNewChildren(newChild :: Nil)
+            val hashExpr = new Murmur3Hash(columnsForHash)
+            val projectList = Seq(Alias(hashExpr, "hash_partition_key")()) ++ 
newChild.output
+            val projectTransformer = ProjectExecTransformer(projectList, 
newChild)
+            val projectBeforeSortValidationResult = 
projectTransformer.doValidate()
+            // Make sure we support offload hash expression
+            val projectBeforeSort = if 
(projectBeforeSortValidationResult.isValid) {
+              projectTransformer
+            } else {
+              val project = ProjectExec(projectList, newChild)
+              TransformHints.tagNotTransformable(project, 
projectBeforeSortValidationResult)
+              project
+            }
+            val sortOrder = SortOrder(projectBeforeSort.output.head, Ascending)
+            val sortByHashCode =
+              SortExecTransformer(Seq(sortOrder), global = false, 
projectBeforeSort)
+            val dropSortColumnTransformer =
+              ProjectExecTransformer(projectList.drop(1), sortByHashCode)
+            val validationResult = dropSortColumnTransformer.doValidate()
+            if (validationResult.isValid) {
+              ColumnarShuffleExchangeExec(
+                shuffle,
+                dropSortColumnTransformer,
+                dropSortColumnTransformer.output)
+            } else {
+              TransformHints.tagNotTransformable(shuffle, validationResult)
+              shuffle.withNewChildren(newChild :: Nil)
+            }
           }
         }
       case _ =>
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala 
b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
index 7bbc24d45..7088b7b07 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
@@ -1536,4 +1536,27 @@ class TestOperator extends 
VeloxWholeStageTransformerSuite {
       checkGlutenOperatorMatch[GenerateExecTransformer]
     }
   }
+
+  test("Fix shuffle with round robin partitioning fail") {
+    def checkNullTypeRepartition(df: => DataFrame, numProject: Int): Unit = {
+      var expected: Array[Row] = null
+      withSQLConf("spark.sql.execution.sortBeforeRepartition" -> "false") {
+        expected = df.collect()
+      }
+      val actual = df
+      checkAnswer(actual, expected)
+      assert(
+        collect(actual.queryExecution.executedPlan) { case p: ProjectExec => p 
}.size == numProject
+      )
+    }
+
+    checkNullTypeRepartition(
+      spark.table("lineitem").selectExpr("l_orderkey", "null as 
x").repartition(),
+      0
+    )
+    checkNullTypeRepartition(
+      spark.table("lineitem").selectExpr("null as x", "null as 
y").repartition(),
+      1
+    )
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to