This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 044811567 Remove local sort for TopNRowNumber (#6381)
044811567 is described below

commit 04481156742f7293d82ecf3f2fd8c406d3dd35d3
Author: Xiduo You <[email protected]>
AuthorDate: Thu Jul 11 20:08:19 2024 +0800

    Remove local sort for TopNRowNumber (#6381)
    
    Co-authored-by: Kent Yao <[email protected]>
---
 .../gluten/backendsapi/velox/VeloxBackend.scala    |  6 +-
 .../gluten/backendsapi/BackendSettingsApi.scala    |  4 +-
 .../HashAggregateExecBaseTransformer.scala         |  5 +-
 .../org/apache/gluten/execution/SortUtils.scala    | 49 -----------
 .../gluten/execution/WindowExecTransformer.scala   |  6 +-
 .../WindowGroupLimitExecTransformer.scala          | 14 +++-
 .../extension/columnar/EliminateLocalSort.scala    | 94 ++++++++++++++++++++++
 .../columnar/EnsureLocalSortRequirements.scala     | 21 +++--
 .../extension/columnar/FallbackTagRule.scala       |  6 +-
 .../extension/columnar/OffloadSingleNode.scala     | 10 +--
 .../columnar/enumerated/EnumeratedApplier.scala    |  1 +
 .../enumerated/RasOffloadHashAggregate.scala       |  2 +-
 .../columnar/heuristic/HeuristicApplier.scala      |  1 +
 .../extension/columnar/rewrite/RewriteJoin.scala   |  5 +-
 .../execution/GlutenSQLWindowFunctionSuite.scala   |  5 +-
 15 files changed, 142 insertions(+), 87 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 63bfcf220..8d98c111a 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -485,7 +485,11 @@ object VeloxBackendSettings extends BackendSettingsApi {
 
   override def alwaysFailOnMapExpression(): Boolean = true
 
-  override def requiredChildOrderingForWindow(): Boolean = true
+  override def requiredChildOrderingForWindow(): Boolean = {
+    GlutenConfig.getConf.veloxColumnarWindowType.equals("streaming")
+  }
+
+  override def requiredChildOrderingForWindowGroupLimit(): Boolean = false
 
   override def staticPartitionWriteOnly(): Boolean = true
 
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index 8ddcc7b7f..07ead8860 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -123,9 +123,9 @@ trait BackendSettingsApi {
 
   def alwaysFailOnMapExpression(): Boolean = false
 
-  def requiredChildOrderingForWindow(): Boolean = false
+  def requiredChildOrderingForWindow(): Boolean = true
 
-  def requiredChildOrderingForWindowGroupLimit(): Boolean = false
+  def requiredChildOrderingForWindowGroupLimit(): Boolean = true
 
   def staticPartitionWriteOnly(): Boolean = false
 
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/execution/HashAggregateExecBaseTransformer.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/execution/HashAggregateExecBaseTransformer.scala
index 9345b3a36..b200426d9 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/execution/HashAggregateExecBaseTransformer.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/execution/HashAggregateExecBaseTransformer.scala
@@ -185,8 +185,7 @@ object HashAggregateExecBaseTransformer {
     case a: SortAggregateExec => a.initialInputBufferOffset
   }
 
-  def from(agg: BaseAggregateExec)(
-      childConverter: SparkPlan => SparkPlan = p => p): 
HashAggregateExecBaseTransformer = {
+  def from(agg: BaseAggregateExec): HashAggregateExecBaseTransformer = {
     BackendsApiManager.getSparkPlanExecApiInstance
       .genHashAggregateExecTransformer(
         agg.requiredChildDistributionExpressions,
@@ -195,7 +194,7 @@ object HashAggregateExecBaseTransformer {
         agg.aggregateAttributes,
         getInitialInputBufferOffset(agg),
         agg.resultExpressions,
-        childConverter(agg.child)
+        agg.child
       )
   }
 }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/execution/SortUtils.scala 
b/gluten-core/src/main/scala/org/apache/gluten/execution/SortUtils.scala
deleted file mode 100644
index b01c71738..000000000
--- a/gluten-core/src/main/scala/org/apache/gluten/execution/SortUtils.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gluten.execution
-
-import org.apache.gluten.extension.columnar.rewrite.RewrittenNodeWall
-
-import org.apache.spark.sql.execution.{ProjectExec, SortExec, SparkPlan}
-
-object SortUtils {
-  def dropPartialSort(plan: SparkPlan): SparkPlan = plan match {
-    case RewrittenNodeWall(p) => RewrittenNodeWall(dropPartialSort(p))
-    case PartialSortLike(child) => child
-    // from pre/post project-pulling
-    case ProjectLike(PartialSortLike(ProjectLike(child))) if plan.outputSet == 
child.outputSet =>
-      child
-    case ProjectLike(PartialSortLike(child)) => 
plan.withNewChildren(Seq(child))
-    case _ => plan
-  }
-}
-
-object PartialSortLike {
-  def unapply(plan: SparkPlan): Option[SparkPlan] = plan match {
-    case sort: SortExecTransformer if !sort.global => Some(sort.child)
-    case sort: SortExec if !sort.global => Some(sort.child)
-    case _ => None
-  }
-}
-
-object ProjectLike {
-  def unapply(plan: SparkPlan): Option[SparkPlan] = plan match {
-    case project: ProjectExecTransformer => Some(project.child)
-    case project: ProjectExec => Some(project.child)
-    case _ => None
-  }
-}
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/execution/WindowExecTransformer.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/execution/WindowExecTransformer.scala
index 6832221a4..628c08f29 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/execution/WindowExecTransformer.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/execution/WindowExecTransformer.scala
@@ -67,11 +67,7 @@ case class WindowExecTransformer(
   }
 
   override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
-    if (
-      BackendsApiManager.getSettings.requiredChildOrderingForWindow()
-      && GlutenConfig.getConf.veloxColumnarWindowType.equals("streaming")
-    ) {
-      // Velox StreamingWindow need to require child order.
+    if (BackendsApiManager.getSettings.requiredChildOrderingForWindow()) {
       Seq(partitionSpec.map(SortOrder(_, Ascending)) ++ orderSpec)
     } else {
       Seq(Nil)
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/execution/WindowGroupLimitExecTransformer.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/execution/WindowGroupLimitExecTransformer.scala
index 46a4e1aa4..c93d01e7a 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/execution/WindowGroupLimitExecTransformer.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/execution/WindowGroupLimitExecTransformer.scala
@@ -66,14 +66,24 @@ case class WindowGroupLimitExecTransformer(
 
   override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
     if 
(BackendsApiManager.getSettings.requiredChildOrderingForWindowGroupLimit()) {
-      // Velox StreamingTopNRowNumber need to require child order.
       Seq(partitionSpec.map(SortOrder(_, Ascending)) ++ orderSpec)
     } else {
       Seq(Nil)
     }
   }
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+  override def outputOrdering: Seq[SortOrder] = {
+    if (requiredChildOrdering.forall(_.isEmpty)) {
+      // The Velox backend `TopNRowNumber` does not require child ordering, 
because it
+      // uses hash table to store partition and use priority queue to track of 
top limit rows.
+      // Ideally, the output of `TopNRowNumber` is unordered but it is grouped 
for partition keys.
+      // To be safe, here we do not propagate the ordering.
+      // TODO: Make the framework aware of grouped data distribution
+      Nil
+    } else {
+      child.outputOrdering
+    }
+  }
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
 
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/EliminateLocalSort.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/EliminateLocalSort.scala
new file mode 100644
index 000000000..6a5c195e5
--- /dev/null
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/EliminateLocalSort.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension.columnar
+
+import org.apache.gluten.execution.{HashAggregateExecBaseTransformer, 
ProjectExecTransformer, ShuffledHashJoinExecTransformerBase, 
SortExecTransformer, WindowExecTransformer, WindowGroupLimitExecTransformer}
+
+import org.apache.spark.sql.catalyst.expressions.SortOrder
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{ProjectExec, SortExec, SparkPlan, 
UnaryExecNode}
+
+/**
+ * This rule is used to eliminate unnecessary local sort.
+ *
+ * This could happen if:
+ *   - Convert sort merge join to shuffled hash join
+ *   - Offload SortAggregate to native hash aggregate
+ *   - Offload WindowGroupLimit to native TopNRowNumber
+ *   - The columnar window type is `sort`
+ */
+object EliminateLocalSort extends Rule[SparkPlan] {
+  private def canEliminateLocalSort(p: SparkPlan): Boolean = p match {
+    case _: HashAggregateExecBaseTransformer => true
+    case _: ShuffledHashJoinExecTransformerBase => true
+    case _: WindowGroupLimitExecTransformer => true
+    case _: WindowExecTransformer => true
+    case _ => false
+  }
+
+  private def canThrough(p: SparkPlan): Boolean = p match {
+    case _: ProjectExec => true
+    case _: ProjectExecTransformer => true
+    case _ => false
+  }
+
+  private def orderingSatisfies(gChild: SparkPlan, requiredOrdering: 
Seq[SortOrder]): Boolean = {
+    SortOrder.orderingSatisfies(gChild.outputOrdering, requiredOrdering)
+  }
+
+  override def apply(plan: SparkPlan): SparkPlan = {
+    plan.transformDown {
+      case p if canEliminateLocalSort(p) =>
+        val requiredChildOrdering = p.requiredChildOrdering
+        assert(requiredChildOrdering.size == p.children.size)
+        val newChildren = p.children.zipWithIndex.map {
+          case (SortWithChild(gChild), i) if orderingSatisfies(gChild, 
requiredChildOrdering(i)) =>
+            gChild
+          case (p: UnaryExecNode, i) if canThrough(p) =>
+            // There may be more than one project between target operator and 
sort,
+            // e.g., both hash aggregate and sort pull out project
+            p.child match {
+              case SortWithChild(gChild) if orderingSatisfies(gChild, 
requiredChildOrdering(i)) =>
+                p.withNewChildren(gChild :: Nil)
+              case _ => p
+            }
+          case p => p._1
+        }
+        p.withNewChildren(newChildren)
+    }
+  }
+}
+
+object SortWithChild {
+  def unapply(plan: SparkPlan): Option[SparkPlan] = {
+    plan match {
+      case p1 @ ProjectExec(_, SortExecTransformer(_, false, p2: ProjectExec, 
_))
+          if p1.outputSet == p2.child.outputSet =>
+        Some(p2.child)
+      case p1 @ ProjectExecTransformer(
+            _,
+            SortExecTransformer(_, false, p2: ProjectExecTransformer, _))
+          if p1.outputSet == p2.child.outputSet =>
+        Some(p2.child)
+      case SortExec(_, false, child, _) =>
+        Some(child)
+      case SortExecTransformer(_, false, child, _) =>
+        Some(child)
+      case _ => None
+    }
+  }
+}
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/EnsureLocalSortRequirements.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/EnsureLocalSortRequirements.scala
index afc29a51e..ff989d796 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/EnsureLocalSortRequirements.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/EnsureLocalSortRequirements.scala
@@ -17,7 +17,8 @@
 package org.apache.gluten.extension.columnar
 
 import org.apache.gluten.GlutenConfig
-import org.apache.gluten.execution.SortExecTransformer
+import 
org.apache.gluten.extension.columnar.MiscColumnarRules.TransformPreOverrides
+import 
org.apache.gluten.extension.columnar.rewrite.RewriteSparkPlanRulesManager
 
 import org.apache.spark.sql.catalyst.expressions.SortOrder
 import org.apache.spark.sql.catalyst.rules.Rule
@@ -32,6 +33,8 @@ import org.apache.spark.sql.execution.{SortExec, SparkPlan}
  * SortAggregate with the same key. So, this rule adds local sort back if 
necessary.
  */
 object EnsureLocalSortRequirements extends Rule[SparkPlan] {
+  private lazy val offload = TransformPreOverrides.apply()
+
   private def addLocalSort(
       originalChild: SparkPlan,
       requiredOrdering: Seq[SortOrder]): SparkPlan = {
@@ -40,18 +43,12 @@ object EnsureLocalSortRequirements extends Rule[SparkPlan] {
       FallbackTags.add(newChild, "columnar Sort is not enabled in SortExec")
       newChild
     } else {
-      val newChildWithTransformer =
-        SortExecTransformer(
-          newChild.sortOrder,
-          newChild.global,
-          newChild.child,
-          newChild.testSpillFrequency)
-      val validationResult = newChildWithTransformer.doValidate()
-      if (validationResult.isValid) {
-        newChildWithTransformer
+      val rewrittenPlan = RewriteSparkPlanRulesManager.apply().apply(newChild)
+      if (rewrittenPlan.eq(newChild) && FallbackTags.nonEmpty(rewrittenPlan)) {
+        // The sort can not be offloaded
+        rewrittenPlan
       } else {
-        FallbackTags.add(newChild, validationResult)
-        newChild
+        offload.apply(rewrittenPlan)
       }
     }
   }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackTagRule.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackTagRule.scala
index d34cb0df4..ddc6870e6 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackTagRule.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackTagRule.scala
@@ -344,13 +344,13 @@ case class AddFallbackTagRule() extends Rule[SparkPlan] {
             .genFilterExecTransformer(plan.condition, plan.child)
           transformer.doValidate().tagOnFallback(plan)
         case plan: HashAggregateExec =>
-          val transformer = HashAggregateExecBaseTransformer.from(plan)()
+          val transformer = HashAggregateExecBaseTransformer.from(plan)
           transformer.doValidate().tagOnFallback(plan)
         case plan: SortAggregateExec =>
-          val transformer = HashAggregateExecBaseTransformer.from(plan)()
+          val transformer = HashAggregateExecBaseTransformer.from(plan)
           transformer.doValidate().tagOnFallback(plan)
         case plan: ObjectHashAggregateExec =>
-          val transformer = HashAggregateExecBaseTransformer.from(plan)()
+          val transformer = HashAggregateExecBaseTransformer.from(plan)
           transformer.doValidate().tagOnFallback(plan)
         case plan: UnionExec =>
           val transformer = ColumnarUnionExec(plan.children)
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
index 62c72af79..742c35341 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
@@ -87,9 +87,9 @@ case class OffloadAggregate() extends OffloadSingleNode with 
LogLevelUtil {
         case _: TransformSupport =>
           // If the child is transformable, transform aggregation as well.
           logDebug(s"Columnar Processing for ${plan.getClass} is currently 
supported.")
-          HashAggregateExecBaseTransformer.from(plan)()
+          HashAggregateExecBaseTransformer.from(plan)
         case p: SparkPlan if PlanUtil.isGlutenTableCache(p) =>
-          HashAggregateExecBaseTransformer.from(plan)()
+          HashAggregateExecBaseTransformer.from(plan)
         case _ =>
           // If the child is not transformable, do not transform the agg.
           FallbackTags.add(plan, "child output schema is empty")
@@ -97,7 +97,7 @@ case class OffloadAggregate() extends OffloadSingleNode with 
LogLevelUtil {
       }
     } else {
       logDebug(s"Columnar Processing for ${plan.getClass} is currently 
supported.")
-      HashAggregateExecBaseTransformer.from(plan)()
+      HashAggregateExecBaseTransformer.from(plan)
     }
   }
 }
@@ -425,10 +425,10 @@ object OffloadOthers {
           ColumnarCoalesceExec(plan.numPartitions, plan.child)
         case plan: SortAggregateExec =>
           logDebug(s"Columnar Processing for ${plan.getClass} is currently 
supported.")
-          
HashAggregateExecBaseTransformer.from(plan)(SortUtils.dropPartialSort)
+          HashAggregateExecBaseTransformer.from(plan)
         case plan: ObjectHashAggregateExec =>
           logDebug(s"Columnar Processing for ${plan.getClass} is currently 
supported.")
-          HashAggregateExecBaseTransformer.from(plan)()
+          HashAggregateExecBaseTransformer.from(plan)
         case plan: UnionExec =>
           val children = plan.children
           logDebug(s"Columnar Processing for ${plan.getClass} is currently 
supported.")
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
index 519db966c..3df0282f8 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
@@ -102,6 +102,7 @@ class EnumeratedApplier(session: SparkSession)
       List(
         (_: SparkSession) => RemoveNativeWriteFilesSortAndProject(),
         (spark: SparkSession) => RewriteTransformer(spark),
+        (_: SparkSession) => EliminateLocalSort,
         (_: SparkSession) => EnsureLocalSortRequirements,
         (_: SparkSession) => CollapseProjectExecTransformer
       ) :::
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RasOffloadHashAggregate.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RasOffloadHashAggregate.scala
index 6c125478b..272e05ca7 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RasOffloadHashAggregate.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RasOffloadHashAggregate.scala
@@ -24,7 +24,7 @@ import 
org.apache.spark.sql.execution.aggregate.HashAggregateExec
 object RasOffloadHashAggregate extends RasOffload {
   override def offload(node: SparkPlan): SparkPlan = node match {
     case agg: HashAggregateExec =>
-      val out = HashAggregateExecBaseTransformer.from(agg)()
+      val out = HashAggregateExecBaseTransformer.from(agg)
       out
     case other => other
   }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
index 03b2b66b0..738d67f4b 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
@@ -114,6 +114,7 @@ class HeuristicApplier(session: SparkSession)
       List(
         (_: SparkSession) => RemoveNativeWriteFilesSortAndProject(),
         (spark: SparkSession) => RewriteTransformer(spark),
+        (_: SparkSession) => EliminateLocalSort,
         (_: SparkSession) => EnsureLocalSortRequirements,
         (_: SparkSession) => CollapseProjectExecTransformer
       ) :::
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteJoin.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteJoin.scala
index e038f5af0..4fd420b02 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteJoin.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteJoin.scala
@@ -17,7 +17,6 @@
 package org.apache.gluten.extension.columnar.rewrite
 
 import org.apache.gluten.GlutenConfig
-import org.apache.gluten.execution.SortUtils
 
 import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, 
BuildSide, JoinSelectionHelper}
 import org.apache.spark.sql.catalyst.plans.JoinType
@@ -52,8 +51,8 @@ object RewriteJoin extends RewriteSingleNode with 
JoinSelectionHelper {
             smj.joinType,
             buildSide,
             smj.condition,
-            SortUtils.dropPartialSort(smj.left),
-            SortUtils.dropPartialSort(smj.right),
+            smj.left,
+            smj.right,
             smj.isSkewJoin
           )
         case _ => plan
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/GlutenSQLWindowFunctionSuite.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/GlutenSQLWindowFunctionSuite.scala
index 89a435174..4a87bac69 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/GlutenSQLWindowFunctionSuite.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/GlutenSQLWindowFunctionSuite.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.sql.execution
 
-import org.apache.gluten.execution.{WindowExecTransformer, 
WindowGroupLimitExecTransformer}
+import org.apache.gluten.execution.{SortExecTransformer, 
WindowExecTransformer, WindowGroupLimitExecTransformer}
 
 import org.apache.spark.sql.GlutenSQLTestsTrait
 import org.apache.spark.sql.Row
@@ -134,6 +134,9 @@ class GlutenSQLWindowFunctionSuite extends 
SQLWindowFunctionSuite with GlutenSQL
           case _ => false
         }
       )
+      assert(
+        getExecutedPlan(df).collect { case s: SortExecTransformer if !s.global 
=> s }.size == 1
+      )
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to