This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 044811567 Remove local sort for TopNRowNumber (#6381)
044811567 is described below
commit 04481156742f7293d82ecf3f2fd8c406d3dd35d3
Author: Xiduo You <[email protected]>
AuthorDate: Thu Jul 11 20:08:19 2024 +0800
Remove local sort for TopNRowNumber (#6381)
Co-authored-by: Kent Yao <[email protected]>
---
.../gluten/backendsapi/velox/VeloxBackend.scala | 6 +-
.../gluten/backendsapi/BackendSettingsApi.scala | 4 +-
.../HashAggregateExecBaseTransformer.scala | 5 +-
.../org/apache/gluten/execution/SortUtils.scala | 49 -----------
.../gluten/execution/WindowExecTransformer.scala | 6 +-
.../WindowGroupLimitExecTransformer.scala | 14 +++-
.../extension/columnar/EliminateLocalSort.scala | 94 ++++++++++++++++++++++
.../columnar/EnsureLocalSortRequirements.scala | 21 +++--
.../extension/columnar/FallbackTagRule.scala | 6 +-
.../extension/columnar/OffloadSingleNode.scala | 10 +--
.../columnar/enumerated/EnumeratedApplier.scala | 1 +
.../enumerated/RasOffloadHashAggregate.scala | 2 +-
.../columnar/heuristic/HeuristicApplier.scala | 1 +
.../extension/columnar/rewrite/RewriteJoin.scala | 5 +-
.../execution/GlutenSQLWindowFunctionSuite.scala | 5 +-
15 files changed, 142 insertions(+), 87 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 63bfcf220..8d98c111a 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -485,7 +485,11 @@ object VeloxBackendSettings extends BackendSettingsApi {
override def alwaysFailOnMapExpression(): Boolean = true
- override def requiredChildOrderingForWindow(): Boolean = true
+ override def requiredChildOrderingForWindow(): Boolean = {
+ GlutenConfig.getConf.veloxColumnarWindowType.equals("streaming")
+ }
+
+ override def requiredChildOrderingForWindowGroupLimit(): Boolean = false
override def staticPartitionWriteOnly(): Boolean = true
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index 8ddcc7b7f..07ead8860 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -123,9 +123,9 @@ trait BackendSettingsApi {
def alwaysFailOnMapExpression(): Boolean = false
- def requiredChildOrderingForWindow(): Boolean = false
+ def requiredChildOrderingForWindow(): Boolean = true
- def requiredChildOrderingForWindowGroupLimit(): Boolean = false
+ def requiredChildOrderingForWindowGroupLimit(): Boolean = true
def staticPartitionWriteOnly(): Boolean = false
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/HashAggregateExecBaseTransformer.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/HashAggregateExecBaseTransformer.scala
index 9345b3a36..b200426d9 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/HashAggregateExecBaseTransformer.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/HashAggregateExecBaseTransformer.scala
@@ -185,8 +185,7 @@ object HashAggregateExecBaseTransformer {
case a: SortAggregateExec => a.initialInputBufferOffset
}
- def from(agg: BaseAggregateExec)(
- childConverter: SparkPlan => SparkPlan = p => p):
HashAggregateExecBaseTransformer = {
+ def from(agg: BaseAggregateExec): HashAggregateExecBaseTransformer = {
BackendsApiManager.getSparkPlanExecApiInstance
.genHashAggregateExecTransformer(
agg.requiredChildDistributionExpressions,
@@ -195,7 +194,7 @@ object HashAggregateExecBaseTransformer {
agg.aggregateAttributes,
getInitialInputBufferOffset(agg),
agg.resultExpressions,
- childConverter(agg.child)
+ agg.child
)
}
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/SortUtils.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/SortUtils.scala
deleted file mode 100644
index b01c71738..000000000
--- a/gluten-core/src/main/scala/org/apache/gluten/execution/SortUtils.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gluten.execution
-
-import org.apache.gluten.extension.columnar.rewrite.RewrittenNodeWall
-
-import org.apache.spark.sql.execution.{ProjectExec, SortExec, SparkPlan}
-
-object SortUtils {
- def dropPartialSort(plan: SparkPlan): SparkPlan = plan match {
- case RewrittenNodeWall(p) => RewrittenNodeWall(dropPartialSort(p))
- case PartialSortLike(child) => child
- // from pre/post project-pulling
- case ProjectLike(PartialSortLike(ProjectLike(child))) if plan.outputSet ==
child.outputSet =>
- child
- case ProjectLike(PartialSortLike(child)) =>
plan.withNewChildren(Seq(child))
- case _ => plan
- }
-}
-
-object PartialSortLike {
- def unapply(plan: SparkPlan): Option[SparkPlan] = plan match {
- case sort: SortExecTransformer if !sort.global => Some(sort.child)
- case sort: SortExec if !sort.global => Some(sort.child)
- case _ => None
- }
-}
-
-object ProjectLike {
- def unapply(plan: SparkPlan): Option[SparkPlan] = plan match {
- case project: ProjectExecTransformer => Some(project.child)
- case project: ProjectExec => Some(project.child)
- case _ => None
- }
-}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/WindowExecTransformer.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/WindowExecTransformer.scala
index 6832221a4..628c08f29 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/WindowExecTransformer.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/WindowExecTransformer.scala
@@ -67,11 +67,7 @@ case class WindowExecTransformer(
}
override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
- if (
- BackendsApiManager.getSettings.requiredChildOrderingForWindow()
- && GlutenConfig.getConf.veloxColumnarWindowType.equals("streaming")
- ) {
- // Velox StreamingWindow need to require child order.
+ if (BackendsApiManager.getSettings.requiredChildOrderingForWindow()) {
Seq(partitionSpec.map(SortOrder(_, Ascending)) ++ orderSpec)
} else {
Seq(Nil)
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/WindowGroupLimitExecTransformer.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/WindowGroupLimitExecTransformer.scala
index 46a4e1aa4..c93d01e7a 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/WindowGroupLimitExecTransformer.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/WindowGroupLimitExecTransformer.scala
@@ -66,14 +66,24 @@ case class WindowGroupLimitExecTransformer(
override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
if
(BackendsApiManager.getSettings.requiredChildOrderingForWindowGroupLimit()) {
- // Velox StreamingTopNRowNumber need to require child order.
Seq(partitionSpec.map(SortOrder(_, Ascending)) ++ orderSpec)
} else {
Seq(Nil)
}
}
- override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+ override def outputOrdering: Seq[SortOrder] = {
+ if (requiredChildOrdering.forall(_.isEmpty)) {
+ // The Velox backend `TopNRowNumber` does not require child ordering,
because it
+ // uses hash table to store partition and use priority queue to track of
top limit rows.
+ // Ideally, the output of `TopNRowNumber` is unordered but it is grouped
for partition keys.
+ // To be safe, here we do not propagate the ordering.
+ // TODO: Make the framework aware of grouped data distribution
+ Nil
+ } else {
+ child.outputOrdering
+ }
+ }
override def outputPartitioning: Partitioning = child.outputPartitioning
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/EliminateLocalSort.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/EliminateLocalSort.scala
new file mode 100644
index 000000000..6a5c195e5
--- /dev/null
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/EliminateLocalSort.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension.columnar
+
+import org.apache.gluten.execution.{HashAggregateExecBaseTransformer,
ProjectExecTransformer, ShuffledHashJoinExecTransformerBase,
SortExecTransformer, WindowExecTransformer, WindowGroupLimitExecTransformer}
+
+import org.apache.spark.sql.catalyst.expressions.SortOrder
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{ProjectExec, SortExec, SparkPlan,
UnaryExecNode}
+
+/**
+ * This rule is used to eliminate unnecessary local sort.
+ *
+ * This could happen if:
+ * - Convert sort merge join to shuffled hash join
+ * - Offload SortAggregate to native hash aggregate
+ * - Offload WindowGroupLimit to native TopNRowNumber
+ * - The columnar window type is `sort`
+ */
+object EliminateLocalSort extends Rule[SparkPlan] {
+ private def canEliminateLocalSort(p: SparkPlan): Boolean = p match {
+ case _: HashAggregateExecBaseTransformer => true
+ case _: ShuffledHashJoinExecTransformerBase => true
+ case _: WindowGroupLimitExecTransformer => true
+ case _: WindowExecTransformer => true
+ case _ => false
+ }
+
+ private def canThrough(p: SparkPlan): Boolean = p match {
+ case _: ProjectExec => true
+ case _: ProjectExecTransformer => true
+ case _ => false
+ }
+
+ private def orderingSatisfies(gChild: SparkPlan, requiredOrdering:
Seq[SortOrder]): Boolean = {
+ SortOrder.orderingSatisfies(gChild.outputOrdering, requiredOrdering)
+ }
+
+ override def apply(plan: SparkPlan): SparkPlan = {
+ plan.transformDown {
+ case p if canEliminateLocalSort(p) =>
+ val requiredChildOrdering = p.requiredChildOrdering
+ assert(requiredChildOrdering.size == p.children.size)
+ val newChildren = p.children.zipWithIndex.map {
+ case (SortWithChild(gChild), i) if orderingSatisfies(gChild,
requiredChildOrdering(i)) =>
+ gChild
+ case (p: UnaryExecNode, i) if canThrough(p) =>
+ // There may be more than one project between target operator and
sort,
+ // e.g., both hash aggregate and sort pull out project
+ p.child match {
+ case SortWithChild(gChild) if orderingSatisfies(gChild,
requiredChildOrdering(i)) =>
+ p.withNewChildren(gChild :: Nil)
+ case _ => p
+ }
+ case p => p._1
+ }
+ p.withNewChildren(newChildren)
+ }
+ }
+}
+
+object SortWithChild {
+ def unapply(plan: SparkPlan): Option[SparkPlan] = {
+ plan match {
+ case p1 @ ProjectExec(_, SortExecTransformer(_, false, p2: ProjectExec,
_))
+ if p1.outputSet == p2.child.outputSet =>
+ Some(p2.child)
+ case p1 @ ProjectExecTransformer(
+ _,
+ SortExecTransformer(_, false, p2: ProjectExecTransformer, _))
+ if p1.outputSet == p2.child.outputSet =>
+ Some(p2.child)
+ case SortExec(_, false, child, _) =>
+ Some(child)
+ case SortExecTransformer(_, false, child, _) =>
+ Some(child)
+ case _ => None
+ }
+ }
+}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/EnsureLocalSortRequirements.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/EnsureLocalSortRequirements.scala
index afc29a51e..ff989d796 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/EnsureLocalSortRequirements.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/EnsureLocalSortRequirements.scala
@@ -17,7 +17,8 @@
package org.apache.gluten.extension.columnar
import org.apache.gluten.GlutenConfig
-import org.apache.gluten.execution.SortExecTransformer
+import
org.apache.gluten.extension.columnar.MiscColumnarRules.TransformPreOverrides
+import
org.apache.gluten.extension.columnar.rewrite.RewriteSparkPlanRulesManager
import org.apache.spark.sql.catalyst.expressions.SortOrder
import org.apache.spark.sql.catalyst.rules.Rule
@@ -32,6 +33,8 @@ import org.apache.spark.sql.execution.{SortExec, SparkPlan}
* SortAggregate with the same key. So, this rule adds local sort back if
necessary.
*/
object EnsureLocalSortRequirements extends Rule[SparkPlan] {
+ private lazy val offload = TransformPreOverrides.apply()
+
private def addLocalSort(
originalChild: SparkPlan,
requiredOrdering: Seq[SortOrder]): SparkPlan = {
@@ -40,18 +43,12 @@ object EnsureLocalSortRequirements extends Rule[SparkPlan] {
FallbackTags.add(newChild, "columnar Sort is not enabled in SortExec")
newChild
} else {
- val newChildWithTransformer =
- SortExecTransformer(
- newChild.sortOrder,
- newChild.global,
- newChild.child,
- newChild.testSpillFrequency)
- val validationResult = newChildWithTransformer.doValidate()
- if (validationResult.isValid) {
- newChildWithTransformer
+ val rewrittenPlan = RewriteSparkPlanRulesManager.apply().apply(newChild)
+ if (rewrittenPlan.eq(newChild) && FallbackTags.nonEmpty(rewrittenPlan)) {
+ // The sort can not be offloaded
+ rewrittenPlan
} else {
- FallbackTags.add(newChild, validationResult)
- newChild
+ offload.apply(rewrittenPlan)
}
}
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackTagRule.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackTagRule.scala
index d34cb0df4..ddc6870e6 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackTagRule.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackTagRule.scala
@@ -344,13 +344,13 @@ case class AddFallbackTagRule() extends Rule[SparkPlan] {
.genFilterExecTransformer(plan.condition, plan.child)
transformer.doValidate().tagOnFallback(plan)
case plan: HashAggregateExec =>
- val transformer = HashAggregateExecBaseTransformer.from(plan)()
+ val transformer = HashAggregateExecBaseTransformer.from(plan)
transformer.doValidate().tagOnFallback(plan)
case plan: SortAggregateExec =>
- val transformer = HashAggregateExecBaseTransformer.from(plan)()
+ val transformer = HashAggregateExecBaseTransformer.from(plan)
transformer.doValidate().tagOnFallback(plan)
case plan: ObjectHashAggregateExec =>
- val transformer = HashAggregateExecBaseTransformer.from(plan)()
+ val transformer = HashAggregateExecBaseTransformer.from(plan)
transformer.doValidate().tagOnFallback(plan)
case plan: UnionExec =>
val transformer = ColumnarUnionExec(plan.children)
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
index 62c72af79..742c35341 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
@@ -87,9 +87,9 @@ case class OffloadAggregate() extends OffloadSingleNode with
LogLevelUtil {
case _: TransformSupport =>
// If the child is transformable, transform aggregation as well.
logDebug(s"Columnar Processing for ${plan.getClass} is currently
supported.")
- HashAggregateExecBaseTransformer.from(plan)()
+ HashAggregateExecBaseTransformer.from(plan)
case p: SparkPlan if PlanUtil.isGlutenTableCache(p) =>
- HashAggregateExecBaseTransformer.from(plan)()
+ HashAggregateExecBaseTransformer.from(plan)
case _ =>
// If the child is not transformable, do not transform the agg.
FallbackTags.add(plan, "child output schema is empty")
@@ -97,7 +97,7 @@ case class OffloadAggregate() extends OffloadSingleNode with
LogLevelUtil {
}
} else {
logDebug(s"Columnar Processing for ${plan.getClass} is currently
supported.")
- HashAggregateExecBaseTransformer.from(plan)()
+ HashAggregateExecBaseTransformer.from(plan)
}
}
}
@@ -425,10 +425,10 @@ object OffloadOthers {
ColumnarCoalesceExec(plan.numPartitions, plan.child)
case plan: SortAggregateExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently
supported.")
-
HashAggregateExecBaseTransformer.from(plan)(SortUtils.dropPartialSort)
+ HashAggregateExecBaseTransformer.from(plan)
case plan: ObjectHashAggregateExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently
supported.")
- HashAggregateExecBaseTransformer.from(plan)()
+ HashAggregateExecBaseTransformer.from(plan)
case plan: UnionExec =>
val children = plan.children
logDebug(s"Columnar Processing for ${plan.getClass} is currently
supported.")
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
index 519db966c..3df0282f8 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
@@ -102,6 +102,7 @@ class EnumeratedApplier(session: SparkSession)
List(
(_: SparkSession) => RemoveNativeWriteFilesSortAndProject(),
(spark: SparkSession) => RewriteTransformer(spark),
+ (_: SparkSession) => EliminateLocalSort,
(_: SparkSession) => EnsureLocalSortRequirements,
(_: SparkSession) => CollapseProjectExecTransformer
) :::
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RasOffloadHashAggregate.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RasOffloadHashAggregate.scala
index 6c125478b..272e05ca7 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RasOffloadHashAggregate.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RasOffloadHashAggregate.scala
@@ -24,7 +24,7 @@ import
org.apache.spark.sql.execution.aggregate.HashAggregateExec
object RasOffloadHashAggregate extends RasOffload {
override def offload(node: SparkPlan): SparkPlan = node match {
case agg: HashAggregateExec =>
- val out = HashAggregateExecBaseTransformer.from(agg)()
+ val out = HashAggregateExecBaseTransformer.from(agg)
out
case other => other
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
index 03b2b66b0..738d67f4b 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
@@ -114,6 +114,7 @@ class HeuristicApplier(session: SparkSession)
List(
(_: SparkSession) => RemoveNativeWriteFilesSortAndProject(),
(spark: SparkSession) => RewriteTransformer(spark),
+ (_: SparkSession) => EliminateLocalSort,
(_: SparkSession) => EnsureLocalSortRequirements,
(_: SparkSession) => CollapseProjectExecTransformer
) :::
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteJoin.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteJoin.scala
index e038f5af0..4fd420b02 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteJoin.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteJoin.scala
@@ -17,7 +17,6 @@
package org.apache.gluten.extension.columnar.rewrite
import org.apache.gluten.GlutenConfig
-import org.apache.gluten.execution.SortUtils
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight,
BuildSide, JoinSelectionHelper}
import org.apache.spark.sql.catalyst.plans.JoinType
@@ -52,8 +51,8 @@ object RewriteJoin extends RewriteSingleNode with
JoinSelectionHelper {
smj.joinType,
buildSide,
smj.condition,
- SortUtils.dropPartialSort(smj.left),
- SortUtils.dropPartialSort(smj.right),
+ smj.left,
+ smj.right,
smj.isSkewJoin
)
case _ => plan
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/GlutenSQLWindowFunctionSuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/GlutenSQLWindowFunctionSuite.scala
index 89a435174..4a87bac69 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/GlutenSQLWindowFunctionSuite.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/GlutenSQLWindowFunctionSuite.scala
@@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.execution
-import org.apache.gluten.execution.{WindowExecTransformer,
WindowGroupLimitExecTransformer}
+import org.apache.gluten.execution.{SortExecTransformer,
WindowExecTransformer, WindowGroupLimitExecTransformer}
import org.apache.spark.sql.GlutenSQLTestsTrait
import org.apache.spark.sql.Row
@@ -134,6 +134,9 @@ class GlutenSQLWindowFunctionSuite extends
SQLWindowFunctionSuite with GlutenSQL
case _ => false
}
)
+ assert(
+ getExecutedPlan(df).collect { case s: SortExecTransformer if !s.global
=> s }.size == 1
+ )
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]