This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new e0602e950 [GLUTEN-6950][CORE] Move specific rules into backend modules 
(#6953)
e0602e950 is described below

commit e0602e9506a150e051c68cc7c28c1213e87aee74
Author: Hongze Zhang <[email protected]>
AuthorDate: Wed Aug 21 16:53:50 2024 +0800

    [GLUTEN-6950][CORE] Move specific rules into backend modules (#6953)
    
    Closes #6950
---
 .../gluten/backendsapi/clickhouse/CHBackend.scala  |   2 -
 .../gluten/backendsapi/clickhouse/CHRuleApi.scala  |   2 -
 .../CommonSubexpressionEliminateRule.scala         |   3 +-
 .../extension/CountDistinctWithoutExpand.scala     |   0
 .../MergeTwoPhasesHashBaseAggregate.scala          |  10 +-
 .../RewriteDateTimestampComparisonRule.scala       |   2 -
 .../extension/RewriteToDateExpresstionRule.scala   |   0
 .../gluten/backendsapi/velox/VeloxBackend.scala    |  49 +-------
 .../gluten/backendsapi/velox/VeloxRuleApi.scala    |   5 +-
 .../gluten/extension/EmptySchemaWorkaround.scala   | 131 +++++++++++++++++++++
 .../gluten/backendsapi/BackendSettingsApi.scala    |  10 --
 .../gluten/extension/columnar/FallbackRules.scala  |  55 ---------
 .../extension/columnar/validator/Validators.scala  |   3 +-
 .../sql/execution/FallbackStrategiesSuite.scala    |  34 ++++--
 .../GlutenReplaceHashWithSortAggSuite.scala        |   4 +-
 .../sql/execution/FallbackStrategiesSuite.scala    |  33 ++++--
 .../sql/execution/FallbackStrategiesSuite.scala    |  33 ++++--
 17 files changed, 215 insertions(+), 161 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index 3c151df7e..86a69f842 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -380,8 +380,6 @@ object CHBackendSettings extends BackendSettingsApi with 
Logging {
     GlutenConfig.getConf.enableNativeWriter.getOrElse(false)
   }
 
-  override def mergeTwoPhasesHashBaseAggregateIfNeed(): Boolean = true
-
   override def supportCartesianProductExec(): Boolean = true
 
   override def supportHashBuildJoinTypeOnLeft: JoinType => Boolean = {
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
index f4a7522d3..fb5147157 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
@@ -64,10 +64,8 @@ private object CHRuleApi {
     injector.injectTransform(_ => RemoveTransitions)
     injector.injectTransform(c => FallbackOnANSIMode.apply(c.session))
     injector.injectTransform(c => FallbackMultiCodegens.apply(c.session))
-    injector.injectTransform(c => PlanOneRowRelation.apply(c.session))
     injector.injectTransform(_ => RewriteSubqueryBroadcast())
     injector.injectTransform(c => FallbackBroadcastHashJoin.apply(c.session))
-    injector.injectTransform(_ => FallbackEmptySchemaRelation())
     injector.injectTransform(c => 
MergeTwoPhasesHashBaseAggregate.apply(c.session))
     injector.injectTransform(_ => RewriteSparkPlanRulesManager())
     injector.injectTransform(_ => AddFallbackTagRule())
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/CommonSubexpressionEliminateRule.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CommonSubexpressionEliminateRule.scala
similarity index 98%
rename from 
gluten-core/src/main/scala/org/apache/gluten/extension/CommonSubexpressionEliminateRule.scala
rename to 
backends-clickhouse/src/main/scala/org/apache/gluten/extension/CommonSubexpressionEliminateRule.scala
index 29199eb0e..a3b74366f 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/CommonSubexpressionEliminateRule.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CommonSubexpressionEliminateRule.scala
@@ -21,8 +21,7 @@ import org.apache.gluten.GlutenConfig
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
-import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction
+import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, 
AggregateFunction}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/CountDistinctWithoutExpand.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CountDistinctWithoutExpand.scala
similarity index 100%
rename from 
gluten-core/src/main/scala/org/apache/gluten/extension/CountDistinctWithoutExpand.scala
rename to 
backends-clickhouse/src/main/scala/org/apache/gluten/extension/CountDistinctWithoutExpand.scala
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MergeTwoPhasesHashAggregate.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/MergeTwoPhasesHashBaseAggregate.scala
similarity index 94%
rename from 
gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MergeTwoPhasesHashAggregate.scala
rename to 
backends-clickhouse/src/main/scala/org/apache/gluten/extension/MergeTwoPhasesHashBaseAggregate.scala
index e19cd09a0..43adf27b4 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MergeTwoPhasesHashAggregate.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/MergeTwoPhasesHashBaseAggregate.scala
@@ -14,10 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.gluten.extension.columnar
+package org.apache.gluten.extension
 
 import org.apache.gluten.GlutenConfig
-import org.apache.gluten.backendsapi.BackendsApiManager
 
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions.aggregate.{Complete, Final, 
Partial}
@@ -39,7 +38,7 @@ case class MergeTwoPhasesHashBaseAggregate(session: 
SparkSession) extends Rule[S
   val columnarConf: GlutenConfig = GlutenConfig.getConf
   val scanOnly: Boolean = columnarConf.enableScanOnly
   val enableColumnarHashAgg: Boolean = !scanOnly && 
columnarConf.enableColumnarHashAgg
-  val replaceSortAggWithHashAgg = 
BackendsApiManager.getSettings.replaceSortAggWithHashAgg
+  val replaceSortAggWithHashAgg: Boolean = 
GlutenConfig.getConf.forceToUseHashAgg
 
   private def isPartialAgg(partialAgg: BaseAggregateExec, finalAgg: 
BaseAggregateExec): Boolean = {
     // TODO: now it can not support to merge agg which there are the filters 
in the aggregate exprs.
@@ -57,10 +56,7 @@ case class MergeTwoPhasesHashBaseAggregate(session: 
SparkSession) extends Rule[S
   }
 
   override def apply(plan: SparkPlan): SparkPlan = {
-    if (
-      !enableColumnarHashAgg || !BackendsApiManager.getSettings
-        .mergeTwoPhasesHashBaseAggregateIfNeed()
-    ) {
+    if (!enableColumnarHashAgg) {
       plan
     } else {
       plan.transformDown {
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/RewriteDateTimestampComparisonRule.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteDateTimestampComparisonRule.scala
similarity index 99%
rename from 
gluten-core/src/main/scala/org/apache/gluten/extension/RewriteDateTimestampComparisonRule.scala
rename to 
backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteDateTimestampComparisonRule.scala
index ec1106955..ea92ddec2 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/RewriteDateTimestampComparisonRule.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteDateTimestampComparisonRule.scala
@@ -27,8 +27,6 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
-import java.lang.IllegalArgumentException
-
 // For readable, people usually convert a unix timestamp into date, and 
compare it with another
 // date. For example
 //    select * from table where '2023-11-02' >= from_unixtime(unix_timestamp, 
'yyyy-MM-dd')
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/RewriteToDateExpresstionRule.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteToDateExpresstionRule.scala
similarity index 100%
rename from 
gluten-core/src/main/scala/org/apache/gluten/extension/RewriteToDateExpresstionRule.scala
rename to 
backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteToDateExpresstionRule.scala
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 21175f20e..065adf338 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -28,12 +28,10 @@ import 
org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat.{DwrfReadFo
 import org.apache.gluten.utils._
 
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
-import org.apache.spark.sql.catalyst.expressions.{Alias, CumeDist, DenseRank, 
Descending, EulerNumber, Expression, Lag, Lead, Literal, MakeYMInterval, 
NamedExpression, NthValue, NTile, PercentRank, Pi, Rand, RangeFrame, Rank, 
RowNumber, SortOrder, SparkPartitionID, SparkVersion, SpecialFrameBoundary, 
SpecifiedWindowFrame, Uuid}
-import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, 
ApproximatePercentile, Count, Sum}
+import org.apache.spark.sql.catalyst.expressions.{Alias, CumeDist, DenseRank, 
Descending, Expression, Lag, Lead, NamedExpression, NthValue, NTile, 
PercentRank, RangeFrame, Rank, RowNumber, SortOrder, SpecialFrameBoundary, 
SpecifiedWindowFrame}
+import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, 
ApproximatePercentile}
 import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter}
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
-import org.apache.spark.sql.execution.{ProjectExec, SparkPlan}
-import org.apache.spark.sql.execution.aggregate.HashAggregateExec
 import 
org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand
 import org.apache.spark.sql.execution.datasources.{FileFormat, 
InsertIntoHadoopFsRelationCommand}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
@@ -443,49 +441,6 @@ object VeloxBackendSettings extends BackendSettingsApi {
       }
   }
 
-  /**
-   * Check whether a plan needs to be offloaded even though they have empty 
input schema, e.g,
-   * Sum(1), Count(1), rand(), etc.
-   * @param plan:
-   *   The Spark plan to check.
-   */
-  private def mayNeedOffload(plan: SparkPlan): Boolean = {
-    def checkExpr(expr: Expression): Boolean = {
-      expr match {
-        // Block directly falling back the below functions by 
FallbackEmptySchemaRelation.
-        case alias: Alias => checkExpr(alias.child)
-        case _: Rand | _: Uuid | _: MakeYMInterval | _: SparkPartitionID | _: 
EulerNumber | _: Pi |
-            _: SparkVersion =>
-          true
-        case _ => false
-      }
-    }
-
-    plan match {
-      case exec: HashAggregateExec if exec.aggregateExpressions.nonEmpty =>
-        // Check Sum(Literal) or Count(Literal).
-        exec.aggregateExpressions.forall(
-          expression => {
-            val aggFunction = expression.aggregateFunction
-            aggFunction match {
-              case Sum(Literal(_, _), _) => true
-              case Count(Seq(Literal(_, _))) => true
-              case _ => false
-            }
-          })
-      case p: ProjectExec if p.projectList.nonEmpty =>
-        p.projectList.forall(checkExpr(_))
-      case _ =>
-        false
-    }
-  }
-
-  override def fallbackOnEmptySchema(plan: SparkPlan): Boolean = {
-    // Count(1) and Sum(1) are special cases that Velox backend can handle.
-    // Do not fallback it and its children in the first place.
-    !mayNeedOffload(plan)
-  }
-
   override def fallbackAggregateWithEmptyOutputChild(): Boolean = true
 
   override def recreateJoinExecOnFallback(): Boolean = true
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index 645407be8..f152da885 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -18,7 +18,8 @@ package org.apache.gluten.backendsapi.velox
 
 import org.apache.gluten.backendsapi.RuleApi
 import org.apache.gluten.datasource.ArrowConvertorRule
-import org.apache.gluten.extension._
+import org.apache.gluten.extension.{ArrowScanReplaceRule, 
BloomFilterMightContainJointRewriteRule, CollectRewriteRule, 
FlushableHashAggregateRule, HLLRewriteRule}
+import 
org.apache.gluten.extension.EmptySchemaWorkaround.{FallbackEmptySchemaRelation, 
PlanOneRowRelation}
 import org.apache.gluten.extension.columnar._
 import 
org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow,
 RemoveTopmostColumnarToRow, RewriteSubqueryBroadcast, TransformPreOverrides}
 import org.apache.gluten.extension.columnar.enumerated.EnumeratedTransform
@@ -61,7 +62,6 @@ private object VeloxRuleApi {
     injector.injectTransform(c => 
BloomFilterMightContainJointRewriteRule.apply(c.session))
     injector.injectTransform(c => ArrowScanReplaceRule.apply(c.session))
     injector.injectTransform(_ => FallbackEmptySchemaRelation())
-    injector.injectTransform(c => 
MergeTwoPhasesHashBaseAggregate.apply(c.session))
     injector.injectTransform(_ => RewriteSparkPlanRulesManager())
     injector.injectTransform(_ => AddFallbackTagRule())
     injector.injectTransform(_ => TransformPreOverrides())
@@ -103,7 +103,6 @@ private object VeloxRuleApi {
     injector.inject(_ => RewriteSubqueryBroadcast())
     injector.inject(c => 
BloomFilterMightContainJointRewriteRule.apply(c.session))
     injector.inject(c => ArrowScanReplaceRule.apply(c.session))
-    injector.inject(c => MergeTwoPhasesHashBaseAggregate.apply(c.session))
 
     // Gluten RAS: The RAS rule.
     injector.inject(c => EnumeratedTransform(c.session, c.outputsColumnar))
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/extension/EmptySchemaWorkaround.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/extension/EmptySchemaWorkaround.scala
new file mode 100644
index 000000000..3f34e7fc2
--- /dev/null
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/extension/EmptySchemaWorkaround.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension
+
+import org.apache.gluten.GlutenConfig
+import org.apache.gluten.extension.columnar.FallbackTags
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, 
EulerNumber, Expression, Literal, MakeYMInterval, Pi, Rand, SparkPartitionID, 
SparkVersion, Uuid}
+import org.apache.spark.sql.catalyst.expressions.aggregate.{Count, Sum}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{ProjectExec, RDDScanExec, SparkPlan, 
UnaryExecNode}
+import org.apache.spark.sql.execution.aggregate.HashAggregateExec
+import org.apache.spark.sql.execution.datasources.WriteFilesExec
+import org.apache.spark.sql.types.StringType
+
+/** Rules to make Velox backend work correctly with query plans that have 
empty output schemas. */
+object EmptySchemaWorkaround {
+
+  /**
+   * This rule plans [[RDDScanExec]] with a fake schema to make gluten work, 
because gluten does not
+   * support empty output relation, see [[FallbackEmptySchemaRelation]].
+   */
+  case class PlanOneRowRelation(spark: SparkSession) extends Rule[SparkPlan] {
+    override def apply(plan: SparkPlan): SparkPlan = {
+      if (!GlutenConfig.getConf.enableOneRowRelationColumnar) {
+        return plan
+      }
+
+      plan.transform {
+        // We should make sure the output does not change, e.g.
+        // Window
+        //   OneRowRelation
+        case u: UnaryExecNode
+            if u.child.isInstanceOf[RDDScanExec] &&
+              u.child.asInstanceOf[RDDScanExec].name == "OneRowRelation" &&
+              u.outputSet != u.child.outputSet =>
+          val rdd = spark.sparkContext.parallelize(InternalRow(null) :: Nil, 1)
+          val attr = AttributeReference("fake_column", StringType)()
+          u.withNewChildren(RDDScanExec(attr :: Nil, rdd, "OneRowRelation") :: 
Nil)
+      }
+    }
+  }
+
+  /**
+   * FIXME To be removed: Since Velox backend is the only one to use the 
strategy, and we already
+   * support offloading zero-column batch in ColumnarBatchInIterator via PR 
#3309.
+   *
+   * We'd make sure all Velox operators be able to handle zero-column input 
correctly then remove
+   * the rule together with [[PlanOneRowRelation]].
+   */
+  case class FallbackEmptySchemaRelation() extends Rule[SparkPlan] {
+    override def apply(plan: SparkPlan): SparkPlan = plan.transformDown {
+      case p =>
+        if (fallbackOnEmptySchema(p)) {
+          if (p.children.exists(_.output.isEmpty)) {
+            // Some backends are not eligible to offload plan with zero-column 
input.
+            // If any child have empty output, mark the plan and that child as 
UNSUPPORTED.
+            FallbackTags.add(p, "at least one of its children has empty 
output")
+            p.children.foreach {
+              child =>
+                if (child.output.isEmpty && 
!child.isInstanceOf[WriteFilesExec]) {
+                  FallbackTags.add(child, "at least one of its children has 
empty output")
+                }
+            }
+          }
+        }
+        p
+    }
+
+    private def fallbackOnEmptySchema(plan: SparkPlan): Boolean = {
+      // Count(1) and Sum(1) are special cases that Velox backend can handle.
+      // Do not fallback it and its children in the first place.
+      !mayNeedOffload(plan)
+    }
+
+    /**
+     * Check whether a plan needs to be offloaded even though they have empty 
input schema, e.g,
+     * Sum(1), Count(1), rand(), etc.
+     * @param plan:
+     *   The Spark plan to check.
+     *
+     * Since https://github.com/apache/incubator-gluten/pull/2749.
+     */
+    private def mayNeedOffload(plan: SparkPlan): Boolean = {
+      def checkExpr(expr: Expression): Boolean = {
+        expr match {
+          // Block directly falling back the below functions by 
FallbackEmptySchemaRelation.
+          case alias: Alias => checkExpr(alias.child)
+          case _: Rand | _: Uuid | _: MakeYMInterval | _: SparkPartitionID | 
_: EulerNumber |
+              _: Pi | _: SparkVersion =>
+            true
+          case _ => false
+        }
+      }
+
+      plan match {
+        case exec: HashAggregateExec if exec.aggregateExpressions.nonEmpty =>
+          // Check Sum(Literal) or Count(Literal).
+          exec.aggregateExpressions.forall(
+            expression => {
+              val aggFunction = expression.aggregateFunction
+              aggFunction match {
+                case Sum(Literal(_, _), _) => true
+                case Count(Seq(Literal(_, _))) => true
+                case _ => false
+              }
+            })
+        case p: ProjectExec if p.projectList.nonEmpty =>
+          p.projectList.forall(checkExpr(_))
+        case _ =>
+          false
+      }
+    }
+  }
+}
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index c9a0301b8..c9205bae9 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -69,7 +69,6 @@ trait BackendSettingsApi {
     case _ => false
   }
   def supportStructType(): Boolean = false
-  def fallbackOnEmptySchema(plan: SparkPlan): Boolean = false
 
   // Whether to fallback aggregate at the same time if its empty-output child 
is fallen back.
   def fallbackAggregateWithEmptyOutputChild(): Boolean = false
@@ -90,12 +89,6 @@ trait BackendSettingsApi {
   def excludeScanExecFromCollapsedStage(): Boolean = false
   def rescaleDecimalArithmetic: Boolean = false
 
-  /**
-   * Whether to replace sort agg with hash agg., e.g., sort agg will be used 
in spark's planning for
-   * string type input.
-   */
-  def replaceSortAggWithHashAgg: Boolean = 
GlutenConfig.getConf.forceToUseHashAgg
-
   /** Get the config prefix for each backend */
   def getBackendConfigPrefix: String
 
@@ -147,9 +140,6 @@ trait BackendSettingsApi {
 
   def supportSampleExec(): Boolean = false
 
-  /** Merge two phases hash based aggregate if need */
-  def mergeTwoPhasesHashBaseAggregateIfNeed(): Boolean = false
-
   def supportColumnarArrowUdf(): Boolean = false
 
   def generateHdfsConfForLibhdfs(): Boolean = false
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackRules.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackRules.scala
index f9eaa4179..6b043fbce 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackRules.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackRules.scala
@@ -27,8 +27,6 @@ import org.apache.gluten.sql.shims.SparkShimLoader
 
 import org.apache.spark.api.python.EvalPythonExecTransformer
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.{TreeNode, TreeNodeTag}
 import org.apache.spark.sql.execution._
@@ -41,7 +39,6 @@ import org.apache.spark.sql.execution.joins._
 import org.apache.spark.sql.execution.python.{ArrowEvalPythonExec, 
BatchEvalPythonExec}
 import org.apache.spark.sql.execution.window.{WindowExec, 
WindowGroupLimitExecShim}
 import org.apache.spark.sql.hive.HiveTableScanExecTransformer
-import org.apache.spark.sql.types.StringType
 
 import org.apache.commons.lang3.exception.ExceptionUtils
 
@@ -241,58 +238,6 @@ case class FallbackMultiCodegens(session: SparkSession) 
extends Rule[SparkPlan]
   }
 }
 
-/**
- * This rule plans [[RDDScanExec]] with a fake schema to make gluten work, 
because gluten does not
- * support empty output relation, see [[FallbackEmptySchemaRelation]].
- */
-case class PlanOneRowRelation(spark: SparkSession) extends Rule[SparkPlan] {
-  override def apply(plan: SparkPlan): SparkPlan = {
-    if (!GlutenConfig.getConf.enableOneRowRelationColumnar) {
-      return plan
-    }
-
-    plan.transform {
-      // We should make sure the output does not change, e.g.
-      // Window
-      //   OneRowRelation
-      case u: UnaryExecNode
-          if u.child.isInstanceOf[RDDScanExec] &&
-            u.child.asInstanceOf[RDDScanExec].name == "OneRowRelation" &&
-            u.outputSet != u.child.outputSet =>
-        val rdd = spark.sparkContext.parallelize(InternalRow(null) :: Nil, 1)
-        val attr = AttributeReference("fake_column", StringType)()
-        u.withNewChildren(RDDScanExec(attr :: Nil, rdd, "OneRowRelation") :: 
Nil)
-    }
-  }
-}
-
-/**
- * FIXME To be removed: Since Velox backend is the only one to use the 
strategy, and we already
- * support offloading zero-column batch in ColumnarBatchInIterator via PR 
#3309.
- *
- * We'd make sure all Velox operators be able to handle zero-column input 
correctly then remove the
- * rule together with [[PlanOneRowRelation]].
- */
-case class FallbackEmptySchemaRelation() extends Rule[SparkPlan] {
-  override def apply(plan: SparkPlan): SparkPlan = plan.transformDown {
-    case p =>
-      if (BackendsApiManager.getSettings.fallbackOnEmptySchema(p)) {
-        if (p.children.exists(_.output.isEmpty)) {
-          // Some backends are not eligible to offload plan with zero-column 
input.
-          // If any child have empty output, mark the plan and that child as 
UNSUPPORTED.
-          FallbackTags.add(p, "at least one of its children has empty output")
-          p.children.foreach {
-            child =>
-              if (child.output.isEmpty && !child.isInstanceOf[WriteFilesExec]) 
{
-                FallbackTags.add(child, "at least one of its children has 
empty output")
-              }
-          }
-        }
-      }
-      p
-  }
-}
-
 // This rule will try to convert a plan into plan transformer.
 // The doValidate function will be called to check if the conversion is 
supported.
 // If false is returned or any unsupported exception is thrown, a row guard 
will
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
index a85cb163c..f1cb47923 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
@@ -143,8 +143,6 @@ object Validators {
       case p: SortMergeJoinExec if !settings.supportSortMergeJoinExec() => 
fail(p)
       case p: WriteFilesExec if !settings.enableNativeWriteFiles() =>
         fail(p)
-      case p: SortAggregateExec if !settings.replaceSortAggWithHashAgg =>
-        fail(p)
       case p: CartesianProductExec if !settings.supportCartesianProductExec() 
=> fail(p)
       case p: TakeOrderedAndProjectExec if 
!settings.supportColumnarShuffleExec() => fail(p)
       case _ => pass()
@@ -162,6 +160,7 @@ object Validators {
       case p: FilterExec if !conf.enableColumnarFilter => fail(p)
       case p: UnionExec if !conf.enableColumnarUnion => fail(p)
       case p: ExpandExec if !conf.enableColumnarExpand => fail(p)
+      case p: SortAggregateExec if !conf.forceToUseHashAgg => fail(p)
       case p: ShuffledHashJoinExec if !conf.enableColumnarShuffledHashJoin => 
fail(p)
       case p: ShuffleExchangeExec if !conf.enableColumnarShuffle => fail(p)
       case p: BroadcastExchangeExec if !conf.enableColumnarBroadcastExchange 
=> fail(p)
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index 1ce0025f2..a4da5c127 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -17,10 +17,9 @@
 package org.apache.spark.sql.execution
 
 import org.apache.gluten.GlutenConfig
-import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.execution.BasicScanExecTransformer
 import org.apache.gluten.extension.GlutenPlan
-import org.apache.gluten.extension.columnar.{ExpandFallbackPolicy, 
FallbackEmptySchemaRelation, FallbackTags, RemoveFallbackTagRule}
+import org.apache.gluten.extension.columnar.{ExpandFallbackPolicy, 
FallbackTags, RemoveFallbackTagRule}
 import 
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleBuilder
 import 
org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow
 import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
@@ -31,6 +30,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.rules.Rule
 
 class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
   import FallbackStrategiesSuite._
@@ -133,13 +133,9 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
     val rule = FallbackEmptySchemaRelation()
     val newPlan = rule.apply(originalPlan)
     val reason = FallbackTags.get(newPlan).reason()
-    if (BackendsApiManager.getSettings.fallbackOnEmptySchema(newPlan)) {
-      assert(
-        reason.contains("fake reason") &&
-          reason.contains("at least one of its children has empty output"))
-    } else {
-      assert(reason.contains("fake reason"))
-    }
+    assert(
+      reason.contains("fake reason") &&
+        reason.contains("at least one of its children has empty output"))
   }
 
   testGluten("test enabling/disabling Gluten at thread level") {
@@ -173,6 +169,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
     thread.join(10000)
   }
 }
+
 private object FallbackStrategiesSuite {
   def newRuleApplier(
       spark: SparkSession,
@@ -189,6 +186,25 @@ private object FallbackStrategiesSuite {
     )
   }
 
+  // TODO: Generalize the code among shim versions.
+  case class FallbackEmptySchemaRelation() extends Rule[SparkPlan] {
+    override def apply(plan: SparkPlan): SparkPlan = plan.transformDown {
+      case p =>
+        if (p.children.exists(_.output.isEmpty)) {
+          // Some backends are not eligible to offload plan with zero-column 
input.
+          // If any child have empty output, mark the plan and that child as 
UNSUPPORTED.
+          FallbackTags.add(p, "at least one of its children has empty output")
+          p.children.foreach {
+            child =>
+              if (child.output.isEmpty) {
+                FallbackTags.add(child, "at least one of its children has 
empty output")
+              }
+          }
+        }
+        p
+    }
+  }
+
   case class LeafOp(override val supportsColumnar: Boolean = false) extends 
LeafExecNode {
     override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
     override def output: Seq[Attribute] = Seq.empty
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenReplaceHashWithSortAggSuite.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenReplaceHashWithSortAggSuite.scala
index 92e6fee97..f394b4687 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenReplaceHashWithSortAggSuite.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenReplaceHashWithSortAggSuite.scala
@@ -16,8 +16,8 @@
  */
 package org.apache.spark.sql.execution
 
-import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.execution.HashAggregateExecBaseTransformer
+import org.apache.gluten.utils.BackendTestUtils
 
 import org.apache.spark.sql.{DataFrame, GlutenSQLTestsBaseTrait}
 import org.apache.spark.sql.execution.aggregate.{ObjectHashAggregateExec, 
SortAggregateExec}
@@ -99,7 +99,7 @@ class GlutenReplaceHashWithSortAggSuite
                |)
                |GROUP BY key
            """.stripMargin
-          if 
(BackendsApiManager.getSettings.mergeTwoPhasesHashBaseAggregateIfNeed()) {
+          if (BackendTestUtils.isCHBackendLoaded()) {
             checkAggs(query, 1, 0, 1, 0)
           } else {
             checkAggs(query, aggExprInfo._2, aggExprInfo._3, aggExprInfo._4, 
aggExprInfo._5)
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index 3acc9c4b3..a4da5c127 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -17,10 +17,9 @@
 package org.apache.spark.sql.execution
 
 import org.apache.gluten.GlutenConfig
-import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.execution.BasicScanExecTransformer
 import org.apache.gluten.extension.GlutenPlan
-import org.apache.gluten.extension.columnar.{ExpandFallbackPolicy, 
FallbackEmptySchemaRelation, FallbackTags, RemoveFallbackTagRule}
+import org.apache.gluten.extension.columnar.{ExpandFallbackPolicy, 
FallbackTags, RemoveFallbackTagRule}
 import 
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleBuilder
 import 
org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow
 import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
@@ -31,6 +30,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.rules.Rule
 
 class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
   import FallbackStrategiesSuite._
@@ -133,13 +133,9 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
     val rule = FallbackEmptySchemaRelation()
     val newPlan = rule.apply(originalPlan)
     val reason = FallbackTags.get(newPlan).reason()
-    if (BackendsApiManager.getSettings.fallbackOnEmptySchema(newPlan)) {
-      assert(
-        reason.contains("fake reason") &&
-          reason.contains("at least one of its children has empty output"))
-    } else {
-      assert(reason.contains("fake reason"))
-    }
+    assert(
+      reason.contains("fake reason") &&
+        reason.contains("at least one of its children has empty output"))
   }
 
   testGluten("test enabling/disabling Gluten at thread level") {
@@ -190,6 +186,25 @@ private object FallbackStrategiesSuite {
     )
   }
 
+  // TODO: Generalize the code among shim versions.
+  case class FallbackEmptySchemaRelation() extends Rule[SparkPlan] {
+    override def apply(plan: SparkPlan): SparkPlan = plan.transformDown {
+      case p =>
+        if (p.children.exists(_.output.isEmpty)) {
+          // Some backends are not eligible to offload plan with zero-column 
input.
+          // If any child have empty output, mark the plan and that child as 
UNSUPPORTED.
+          FallbackTags.add(p, "at least one of its children has empty output")
+          p.children.foreach {
+            child =>
+              if (child.output.isEmpty) {
+                FallbackTags.add(child, "at least one of its children has 
empty output")
+              }
+          }
+        }
+        p
+    }
+  }
+
   case class LeafOp(override val supportsColumnar: Boolean = false) extends 
LeafExecNode {
     override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
     override def output: Seq[Attribute] = Seq.empty
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index bcc4e829b..bbdeebfe6 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -17,10 +17,9 @@
 package org.apache.spark.sql.execution
 
 import org.apache.gluten.GlutenConfig
-import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.execution.BasicScanExecTransformer
 import org.apache.gluten.extension.GlutenPlan
-import org.apache.gluten.extension.columnar.{ExpandFallbackPolicy, 
FallbackEmptySchemaRelation, FallbackTags, RemoveFallbackTagRule}
+import org.apache.gluten.extension.columnar.{ExpandFallbackPolicy, 
FallbackTags, RemoveFallbackTagRule}
 import 
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleBuilder
 import 
org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow
 import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
@@ -31,6 +30,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.rules.Rule
 
 class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
   import FallbackStrategiesSuite._
@@ -134,13 +134,9 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
     val rule = FallbackEmptySchemaRelation()
     val newPlan = rule.apply(originalPlan)
     val reason = FallbackTags.get(newPlan).reason()
-    if (BackendsApiManager.getSettings.fallbackOnEmptySchema(newPlan)) {
-      assert(
-        reason.contains("fake reason") &&
-          reason.contains("at least one of its children has empty output"))
-    } else {
-      assert(reason.contains("fake reason"))
-    }
+    assert(
+      reason.contains("fake reason") &&
+        reason.contains("at least one of its children has empty output"))
   }
 
   testGluten("test enabling/disabling Gluten at thread level") {
@@ -191,6 +187,25 @@ private object FallbackStrategiesSuite {
     )
   }
 
+  // TODO: Generalize the code among shim versions.
+  case class FallbackEmptySchemaRelation() extends Rule[SparkPlan] {
+    override def apply(plan: SparkPlan): SparkPlan = plan.transformDown {
+      case p =>
+        if (p.children.exists(_.output.isEmpty)) {
+          // Some backends are not eligible to offload plan with zero-column 
input.
+          // If any child have empty output, mark the plan and that child as 
UNSUPPORTED.
+          FallbackTags.add(p, "at least one of its children has empty output")
+          p.children.foreach {
+            child =>
+              if (child.output.isEmpty) {
+                FallbackTags.add(child, "at least one of its children has 
empty output")
+              }
+          }
+        }
+        p
+    }
+  }
+
   case class LeafOp(override val supportsColumnar: Boolean = false) extends 
LeafExecNode {
     override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
     override def output: Seq[Attribute] = Seq.empty


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to