This is an automated email from the ASF dual-hosted git repository.

kejia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new b7dd9a83c [VL] Eliminate pre local sort after offload date type range 
frame window (#6667)
b7dd9a83c is described below

commit b7dd9a83ca8956bb8d597b9643b247528fd95cf6
Author: Mingliang Zhu <[email protected]>
AuthorDate: Fri Aug 2 16:52:37 2024 +0800

    [VL] Eliminate pre local sort after offload date type range frame window 
(#6667)
---
 .../scala/org/apache/gluten/execution/TestOperator.scala     | 12 +++++++++---
 .../gluten/extension/columnar/EliminateLocalSort.scala       |  3 +++
 .../extension/columnar/enumerated/EnumeratedApplier.scala    |  2 +-
 .../extension/columnar/heuristic/HeuristicApplier.scala      |  2 +-
 4 files changed, 14 insertions(+), 5 deletions(-)

diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala 
b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
index 7eb9df7be..5ca5087d9 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
@@ -351,14 +351,20 @@ class TestOperator extends 
VeloxWholeStageTransformerSuite with AdaptiveSparkPla
   }
 
   test("window expression") {
-    Seq("sort", "streaming").foreach {
-      windowType =>
+    Seq(("sort", 0), ("streaming", 1)).foreach {
+      case (windowType, localSortSize) =>
         withSQLConf("spark.gluten.sql.columnar.backend.velox.window.type" -> 
windowType) {
           runQueryAndCompare(
             "select max(l_partkey) over" +
               " (partition by l_suppkey order by l_commitdate" +
               " RANGE BETWEEN 1 PRECEDING AND CURRENT ROW) from lineitem ") {
-            checkSparkOperatorMatch[WindowExecTransformer]
+            df =>
+              checkSparkOperatorMatch[WindowExecTransformer](df)
+              assert(
+                getExecutedPlan(df).collect {
+                  case s: SortExecTransformer if !s.global => s
+                }.size == localSortSize
+              )
           }
 
           runQueryAndCompare(
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/EliminateLocalSort.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/EliminateLocalSort.scala
index 6a5c195e5..03e7e4eb7 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/EliminateLocalSort.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/EliminateLocalSort.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.execution.{ProjectExec, SortExec, 
SparkPlan, UnaryEx
  *   - Offload SortAggregate to native hash aggregate
  *   - Offload WindowGroupLimit to native TopNRowNumber
  *   - The columnar window type is `sort`
+ *   - Offload Window which has date type range frame
  */
 object EliminateLocalSort extends Rule[SparkPlan] {
   private def canEliminateLocalSort(p: SparkPlan): Boolean = p match {
@@ -37,6 +38,8 @@ object EliminateLocalSort extends Rule[SparkPlan] {
     case _: ShuffledHashJoinExecTransformerBase => true
     case _: WindowGroupLimitExecTransformer => true
     case _: WindowExecTransformer => true
+    case s: SortExec if s.global == false => true
+    case s: SortExecTransformer if s.global == false => true
     case _ => false
   }
 
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
index 3df0282f8..5cf3961c5 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
@@ -102,8 +102,8 @@ class EnumeratedApplier(session: SparkSession)
       List(
         (_: SparkSession) => RemoveNativeWriteFilesSortAndProject(),
         (spark: SparkSession) => RewriteTransformer(spark),
-        (_: SparkSession) => EliminateLocalSort,
         (_: SparkSession) => EnsureLocalSortRequirements,
+        (_: SparkSession) => EliminateLocalSort,
         (_: SparkSession) => CollapseProjectExecTransformer
       ) :::
       
BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarTransformRules()
 :::
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
index 738d67f4b..f776a1dcc 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
@@ -114,8 +114,8 @@ class HeuristicApplier(session: SparkSession)
       List(
         (_: SparkSession) => RemoveNativeWriteFilesSortAndProject(),
         (spark: SparkSession) => RewriteTransformer(spark),
-        (_: SparkSession) => EliminateLocalSort,
         (_: SparkSession) => EnsureLocalSortRequirements,
+        (_: SparkSession) => EliminateLocalSort,
         (_: SparkSession) => CollapseProjectExecTransformer
       ) :::
       
BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarTransformRules()
 :::


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to