This is an automated email from the ASF dual-hosted git repository.

marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 280064a2e  [VL] Allow hash on map for round robin repartitioning 
(#5349)
280064a2e is described below

commit 280064a2e6bfa0924f9a2423c530b41e69d87cb9
Author: Rong Ma <[email protected]>
AuthorDate: Mon May 27 15:39:47 2024 +0800

     [VL] Allow hash on map for round robin repartitioning (#5349)
---
 .../backendsapi/velox/VeloxSparkPlanExecApi.scala  | 49 +++++++++++++++-------
 .../org/apache/gluten/execution/TestOperator.scala | 16 +++++--
 .../org/apache/gluten/fuzzer/FuzzerTestBase.scala  |  2 +-
 .../gluten/fuzzer/ShuffleWriterFuzzerTest.scala    |  2 +-
 4 files changed, 49 insertions(+), 20 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index cfa135046..155a33c94 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -315,6 +315,16 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
   override def genColumnarShuffleExchange(
       shuffle: ShuffleExchangeExec,
       newChild: SparkPlan): SparkPlan = {
+    def allowHashOnMap[T](f: => T): T = {
+      val originalAllowHash = 
SQLConf.get.getConf(SQLConf.LEGACY_ALLOW_HASH_ON_MAPTYPE)
+      try {
+        SQLConf.get.setConf(SQLConf.LEGACY_ALLOW_HASH_ON_MAPTYPE, true)
+        f
+      } finally {
+        SQLConf.get.setConf(SQLConf.LEGACY_ALLOW_HASH_ON_MAPTYPE, 
originalAllowHash)
+      }
+    }
+
     shuffle.outputPartitioning match {
       case HashPartitioning(exprs, _) =>
         val hashExpr = new Murmur3Hash(exprs)
@@ -331,21 +341,30 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
           shuffle.withNewChildren(newChild :: Nil)
         }
       case RoundRobinPartitioning(num) if SQLConf.get.sortBeforeRepartition && 
num > 1 =>
-        val hashExpr = new Murmur3Hash(newChild.output)
-        val projectList = Seq(Alias(hashExpr, "hash_partition_key")()) ++ 
newChild.output
-        val projectTransformer = ProjectExecTransformer(projectList, newChild)
-        val sortOrder = SortOrder(projectTransformer.output.head, Ascending)
-        val sortByHashCode = SortExecTransformer(Seq(sortOrder), global = 
false, projectTransformer)
-        val dropSortColumnTransformer = 
ProjectExecTransformer(projectList.drop(1), sortByHashCode)
-        val validationResult = dropSortColumnTransformer.doValidate()
-        if (validationResult.isValid) {
-          ColumnarShuffleExchangeExec(
-            shuffle,
-            dropSortColumnTransformer,
-            dropSortColumnTransformer.output)
-        } else {
-          TransformHints.tagNotTransformable(shuffle, validationResult)
-          shuffle.withNewChildren(newChild :: Nil)
+        // scalastyle:off line.size.limit
+        // Temporarily allow hash on map if it's disabled, otherwise 
HashExpression will fail to get
+        // resolved if its child contains map type.
+        // See 
https://github.com/apache/spark/blob/609bd4839e5d504917de74ed1cb9c23645fba51f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala#L279-L283
+        // scalastyle:on line.size.limit
+        allowHashOnMap {
+          val hashExpr = new Murmur3Hash(newChild.output)
+          val projectList = Seq(Alias(hashExpr, "hash_partition_key")()) ++ 
newChild.output
+          val projectTransformer = ProjectExecTransformer(projectList, 
newChild)
+          val sortOrder = SortOrder(projectTransformer.output.head, Ascending)
+          val sortByHashCode =
+            SortExecTransformer(Seq(sortOrder), global = false, 
projectTransformer)
+          val dropSortColumnTransformer =
+            ProjectExecTransformer(projectList.drop(1), sortByHashCode)
+          val validationResult = dropSortColumnTransformer.doValidate()
+          if (validationResult.isValid) {
+            ColumnarShuffleExchangeExec(
+              shuffle,
+              dropSortColumnTransformer,
+              dropSortColumnTransformer.output)
+          } else {
+            TransformHints.tagNotTransformable(shuffle, validationResult)
+            shuffle.withNewChildren(newChild :: Nil)
+          }
         }
       case _ =>
         ColumnarShuffleExchangeExec(shuffle, newChild, null)
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala 
b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
index 657039572..8e8423360 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
@@ -22,7 +22,7 @@ import 
org.apache.gluten.execution.datasource.v2.ArrowBatchScanExec
 import org.apache.gluten.sql.shims.SparkShimLoader
 
 import org.apache.spark.SparkConf
-import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.window.WindowExec
 import org.apache.spark.sql.functions._
@@ -1354,7 +1354,12 @@ class TestOperator extends 
VeloxWholeStageTransformerSuite {
     }
   }
 
-  test("test roundrobine with sort") {
+  test("test RoundRobin repartition with sort") {
+    def checkRoundRobinOperators(df: DataFrame): Unit = {
+      checkGlutenOperatorMatch[SortExecTransformer](df)
+      checkGlutenOperatorMatch[ColumnarShuffleExchangeExec](df)
+    }
+
     // scalastyle:off
     runQueryAndCompare("SELECT /*+ REPARTITION(3) */ l_orderkey, l_partkey 
FROM lineitem") {
       /*
@@ -1364,7 +1369,7 @@ class TestOperator extends 
VeloxWholeStageTransformerSuite {
             +- ^(2) ProjectExecTransformer [hash(l_orderkey#16L, 
l_partkey#17L) AS hash_partition_key#302, l_orderkey#16L, l_partkey#17L]
                 +- ^(2) BatchScanExecTransformer[l_orderkey#16L, 
l_partkey#17L] ParquetScan DataFilters: [], Format: parquet, Location: 
InMemoryFileIndex(1 paths)[..., PartitionFilters: [], PushedFilters: [], 
ReadSchema: struct<l_orderkey:bigint,l_partkey:bigint>, PushedFilters: [] 
RuntimeFilters: []
        */
-      checkGlutenOperatorMatch[SortExecTransformer]
+      checkRoundRobinOperators
     }
     // scalastyle:on
 
@@ -1377,6 +1382,11 @@ class TestOperator extends 
VeloxWholeStageTransformerSuite {
           }
       }
     }
+
+    // Gluten-5206: test repartition on map type
+    runQueryAndCompare(
+      "SELECT /*+ REPARTITION(3) */ l_orderkey, map(l_orderkey, l_partkey) 
FROM lineitem")(
+      checkRoundRobinOperators)
   }
 
   test("Support Map type signature") {
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/fuzzer/FuzzerTestBase.scala 
b/backends-velox/src/test/scala/org/apache/gluten/fuzzer/FuzzerTestBase.scala
index 7d59fbfae..1ee79a2ad 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/fuzzer/FuzzerTestBase.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/fuzzer/FuzzerTestBase.scala
@@ -35,7 +35,7 @@ abstract class FuzzerTestBase extends 
VeloxWholeStageTransformerSuite {
       .set("spark.plugins", "org.apache.gluten.GlutenPlugin")
       .set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
       .set("spark.memory.offHeap.enabled", "true")
-      .set("spark.memory.offHeap.size", "512MB")
+      .set("spark.memory.offHeap.size", "4g")
       .set("spark.driver.memory", "4g")
       .set("spark.driver.maxResultSize", "4g")
   }
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/fuzzer/ShuffleWriterFuzzerTest.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/fuzzer/ShuffleWriterFuzzerTest.scala
index 1d27f2681..7d8fc56d9 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/fuzzer/ShuffleWriterFuzzerTest.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/fuzzer/ShuffleWriterFuzzerTest.scala
@@ -68,7 +68,7 @@ class ShuffleWriterFuzzerTest extends FuzzerTestBase {
         logWarning(
           s"==============================> " +
             s"Started reproduction (seed: ${dataGenerator.getSeed})")
-        val result = defaultRunner(testShuffle(sql))
+        val result = defaultRunner(testShuffle(sql))()
         assert(result.isInstanceOf[Successful], s"Failed to run 'reproduce' 
with seed: $seed")
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to