This is an automated email from the ASF dual-hosted git repository.
marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 280064a2e [VL] Allow hash on map for round robin repartitioning
(#5349)
280064a2e is described below
commit 280064a2e6bfa0924f9a2423c530b41e69d87cb9
Author: Rong Ma <[email protected]>
AuthorDate: Mon May 27 15:39:47 2024 +0800
[VL] Allow hash on map for round robin repartitioning (#5349)
---
.../backendsapi/velox/VeloxSparkPlanExecApi.scala | 49 +++++++++++++++-------
.../org/apache/gluten/execution/TestOperator.scala | 16 +++++--
.../org/apache/gluten/fuzzer/FuzzerTestBase.scala | 2 +-
.../gluten/fuzzer/ShuffleWriterFuzzerTest.scala | 2 +-
4 files changed, 49 insertions(+), 20 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index cfa135046..155a33c94 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -315,6 +315,16 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
override def genColumnarShuffleExchange(
shuffle: ShuffleExchangeExec,
newChild: SparkPlan): SparkPlan = {
+ def allowHashOnMap[T](f: => T): T = {
+ val originalAllowHash =
SQLConf.get.getConf(SQLConf.LEGACY_ALLOW_HASH_ON_MAPTYPE)
+ try {
+ SQLConf.get.setConf(SQLConf.LEGACY_ALLOW_HASH_ON_MAPTYPE, true)
+ f
+ } finally {
+ SQLConf.get.setConf(SQLConf.LEGACY_ALLOW_HASH_ON_MAPTYPE,
originalAllowHash)
+ }
+ }
+
shuffle.outputPartitioning match {
case HashPartitioning(exprs, _) =>
val hashExpr = new Murmur3Hash(exprs)
@@ -331,21 +341,30 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
shuffle.withNewChildren(newChild :: Nil)
}
case RoundRobinPartitioning(num) if SQLConf.get.sortBeforeRepartition &&
num > 1 =>
- val hashExpr = new Murmur3Hash(newChild.output)
- val projectList = Seq(Alias(hashExpr, "hash_partition_key")()) ++
newChild.output
- val projectTransformer = ProjectExecTransformer(projectList, newChild)
- val sortOrder = SortOrder(projectTransformer.output.head, Ascending)
- val sortByHashCode = SortExecTransformer(Seq(sortOrder), global =
false, projectTransformer)
- val dropSortColumnTransformer =
ProjectExecTransformer(projectList.drop(1), sortByHashCode)
- val validationResult = dropSortColumnTransformer.doValidate()
- if (validationResult.isValid) {
- ColumnarShuffleExchangeExec(
- shuffle,
- dropSortColumnTransformer,
- dropSortColumnTransformer.output)
- } else {
- TransformHints.tagNotTransformable(shuffle, validationResult)
- shuffle.withNewChildren(newChild :: Nil)
+ // scalastyle:off line.size.limit
+ // Temporarily allow hash on map if it's disabled, otherwise
HashExpression will fail to get
+ // resolved if its child contains map type.
+ // See
https://github.com/apache/spark/blob/609bd4839e5d504917de74ed1cb9c23645fba51f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala#L279-L283
+ // scalastyle:on line.size.limit
+ allowHashOnMap {
+ val hashExpr = new Murmur3Hash(newChild.output)
+ val projectList = Seq(Alias(hashExpr, "hash_partition_key")()) ++
newChild.output
+ val projectTransformer = ProjectExecTransformer(projectList,
newChild)
+ val sortOrder = SortOrder(projectTransformer.output.head, Ascending)
+ val sortByHashCode =
+ SortExecTransformer(Seq(sortOrder), global = false,
projectTransformer)
+ val dropSortColumnTransformer =
+ ProjectExecTransformer(projectList.drop(1), sortByHashCode)
+ val validationResult = dropSortColumnTransformer.doValidate()
+ if (validationResult.isValid) {
+ ColumnarShuffleExchangeExec(
+ shuffle,
+ dropSortColumnTransformer,
+ dropSortColumnTransformer.output)
+ } else {
+ TransformHints.tagNotTransformable(shuffle, validationResult)
+ shuffle.withNewChildren(newChild :: Nil)
+ }
}
case _ =>
ColumnarShuffleExchangeExec(shuffle, newChild, null)
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
index 657039572..8e8423360 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
@@ -22,7 +22,7 @@ import
org.apache.gluten.execution.datasource.v2.ArrowBatchScanExec
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.spark.SparkConf
-import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.functions._
@@ -1354,7 +1354,12 @@ class TestOperator extends
VeloxWholeStageTransformerSuite {
}
}
- test("test roundrobine with sort") {
+ test("test RoundRobin repartition with sort") {
+ def checkRoundRobinOperators(df: DataFrame): Unit = {
+ checkGlutenOperatorMatch[SortExecTransformer](df)
+ checkGlutenOperatorMatch[ColumnarShuffleExchangeExec](df)
+ }
+
// scalastyle:off
runQueryAndCompare("SELECT /*+ REPARTITION(3) */ l_orderkey, l_partkey
FROM lineitem") {
/*
@@ -1364,7 +1369,7 @@ class TestOperator extends
VeloxWholeStageTransformerSuite {
+- ^(2) ProjectExecTransformer [hash(l_orderkey#16L,
l_partkey#17L) AS hash_partition_key#302, l_orderkey#16L, l_partkey#17L]
+- ^(2) BatchScanExecTransformer[l_orderkey#16L,
l_partkey#17L] ParquetScan DataFilters: [], Format: parquet, Location:
InMemoryFileIndex(1 paths)[..., PartitionFilters: [], PushedFilters: [],
ReadSchema: struct<l_orderkey:bigint,l_partkey:bigint>, PushedFilters: []
RuntimeFilters: []
*/
- checkGlutenOperatorMatch[SortExecTransformer]
+ checkRoundRobinOperators
}
// scalastyle:on
@@ -1377,6 +1382,11 @@ class TestOperator extends
VeloxWholeStageTransformerSuite {
}
}
}
+
+ // Gluten-5206: test repartition on map type
+ runQueryAndCompare(
+ "SELECT /*+ REPARTITION(3) */ l_orderkey, map(l_orderkey, l_partkey)
FROM lineitem")(
+ checkRoundRobinOperators)
}
test("Support Map type signature") {
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/fuzzer/FuzzerTestBase.scala
b/backends-velox/src/test/scala/org/apache/gluten/fuzzer/FuzzerTestBase.scala
index 7d59fbfae..1ee79a2ad 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/fuzzer/FuzzerTestBase.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/fuzzer/FuzzerTestBase.scala
@@ -35,7 +35,7 @@ abstract class FuzzerTestBase extends
VeloxWholeStageTransformerSuite {
.set("spark.plugins", "org.apache.gluten.GlutenPlugin")
.set("spark.shuffle.manager",
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
.set("spark.memory.offHeap.enabled", "true")
- .set("spark.memory.offHeap.size", "512MB")
+ .set("spark.memory.offHeap.size", "4g")
.set("spark.driver.memory", "4g")
.set("spark.driver.maxResultSize", "4g")
}
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/fuzzer/ShuffleWriterFuzzerTest.scala
b/backends-velox/src/test/scala/org/apache/gluten/fuzzer/ShuffleWriterFuzzerTest.scala
index 1d27f2681..7d8fc56d9 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/fuzzer/ShuffleWriterFuzzerTest.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/fuzzer/ShuffleWriterFuzzerTest.scala
@@ -68,7 +68,7 @@ class ShuffleWriterFuzzerTest extends FuzzerTestBase {
logWarning(
s"==============================> " +
s"Started reproduction (seed: ${dataGenerator.getSeed})")
- val result = defaultRunner(testShuffle(sql))
+ val result = defaultRunner(testShuffle(sql))()
assert(result.isInstanceOf[Successful], s"Failed to run 'reproduce'
with seed: $seed")
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]