This is an automated email from the ASF dual-hosted git repository.
marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 4c3fbb8e3 Fix shuffle with round robin partitioning fail (#5928)
4c3fbb8e3 is described below
commit 4c3fbb8e369de480ba31b6385eb35c6d8c3852be
Author: Xiduo You <[email protected]>
AuthorDate: Fri May 31 09:19:25 2024 +0800
Fix shuffle with round robin partitioning fail (#5928)
---
.../backendsapi/velox/VeloxSparkPlanExecApi.scala | 48 ++++++++++++++--------
.../org/apache/gluten/execution/TestOperator.scala | 23 +++++++++++
2 files changed, 55 insertions(+), 16 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index 92be63a58..58b27e8a7 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -347,23 +347,39 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
// See
https://github.com/apache/spark/blob/609bd4839e5d504917de74ed1cb9c23645fba51f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala#L279-L283
// scalastyle:on line.size.limit
allowHashOnMap {
- val hashExpr = new Murmur3Hash(newChild.output)
- val projectList = Seq(Alias(hashExpr, "hash_partition_key")()) ++
newChild.output
- val projectTransformer = ProjectExecTransformer(projectList,
newChild)
- val sortOrder = SortOrder(projectTransformer.output.head, Ascending)
- val sortByHashCode =
- SortExecTransformer(Seq(sortOrder), global = false,
projectTransformer)
- val dropSortColumnTransformer =
- ProjectExecTransformer(projectList.drop(1), sortByHashCode)
- val validationResult = dropSortColumnTransformer.doValidate()
- if (validationResult.isValid) {
- ColumnarShuffleExchangeExec(
- shuffle,
- dropSortColumnTransformer,
- dropSortColumnTransformer.output)
+ // Velox hash expression does not support null type and we also do
not need to sort
+ // null type since the value always be null.
+ val columnsForHash = newChild.output.filterNot(_.dataType ==
NullType)
+ if (columnsForHash.isEmpty) {
+ ColumnarShuffleExchangeExec(shuffle, newChild, newChild.output)
} else {
- TransformHints.tagNotTransformable(shuffle, validationResult)
- shuffle.withNewChildren(newChild :: Nil)
+ val hashExpr = new Murmur3Hash(columnsForHash)
+ val projectList = Seq(Alias(hashExpr, "hash_partition_key")()) ++
newChild.output
+ val projectTransformer = ProjectExecTransformer(projectList,
newChild)
+ val projectBeforeSortValidationResult =
projectTransformer.doValidate()
+ // Make sure we support offload hash expression
+ val projectBeforeSort = if
(projectBeforeSortValidationResult.isValid) {
+ projectTransformer
+ } else {
+ val project = ProjectExec(projectList, newChild)
+ TransformHints.tagNotTransformable(project,
projectBeforeSortValidationResult)
+ project
+ }
+ val sortOrder = SortOrder(projectBeforeSort.output.head, Ascending)
+ val sortByHashCode =
+ SortExecTransformer(Seq(sortOrder), global = false,
projectBeforeSort)
+ val dropSortColumnTransformer =
+ ProjectExecTransformer(projectList.drop(1), sortByHashCode)
+ val validationResult = dropSortColumnTransformer.doValidate()
+ if (validationResult.isValid) {
+ ColumnarShuffleExchangeExec(
+ shuffle,
+ dropSortColumnTransformer,
+ dropSortColumnTransformer.output)
+ } else {
+ TransformHints.tagNotTransformable(shuffle, validationResult)
+ shuffle.withNewChildren(newChild :: Nil)
+ }
}
}
case _ =>
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
index 7bbc24d45..7088b7b07 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
@@ -1536,4 +1536,27 @@ class TestOperator extends
VeloxWholeStageTransformerSuite {
checkGlutenOperatorMatch[GenerateExecTransformer]
}
}
+
+ test("Fix shuffle with round robin partitioning fail") {
+ def checkNullTypeRepartition(df: => DataFrame, numProject: Int): Unit = {
+ var expected: Array[Row] = null
+ withSQLConf("spark.sql.execution.sortBeforeRepartition" -> "false") {
+ expected = df.collect()
+ }
+ val actual = df
+ checkAnswer(actual, expected)
+ assert(
+ collect(actual.queryExecution.executedPlan) { case p: ProjectExec => p
}.size == numProject
+ )
+ }
+
+ checkNullTypeRepartition(
+ spark.table("lineitem").selectExpr("l_orderkey", "null as
x").repartition(),
+ 0
+ )
+ checkNullTypeRepartition(
+ spark.table("lineitem").selectExpr("null as x", "null as
y").repartition(),
+ 1
+ )
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]