This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new e0602e950 [GLUTEN-6950][CORE] Move specific rules into backend modules
(#6953)
e0602e950 is described below
commit e0602e9506a150e051c68cc7c28c1213e87aee74
Author: Hongze Zhang <[email protected]>
AuthorDate: Wed Aug 21 16:53:50 2024 +0800
[GLUTEN-6950][CORE] Move specific rules into backend modules (#6953)
Closes #6950
---
.../gluten/backendsapi/clickhouse/CHBackend.scala | 2 -
.../gluten/backendsapi/clickhouse/CHRuleApi.scala | 2 -
.../CommonSubexpressionEliminateRule.scala | 3 +-
.../extension/CountDistinctWithoutExpand.scala | 0
.../MergeTwoPhasesHashBaseAggregate.scala | 10 +-
.../RewriteDateTimestampComparisonRule.scala | 2 -
.../extension/RewriteToDateExpresstionRule.scala | 0
.../gluten/backendsapi/velox/VeloxBackend.scala | 49 +-------
.../gluten/backendsapi/velox/VeloxRuleApi.scala | 5 +-
.../gluten/extension/EmptySchemaWorkaround.scala | 131 +++++++++++++++++++++
.../gluten/backendsapi/BackendSettingsApi.scala | 10 --
.../gluten/extension/columnar/FallbackRules.scala | 55 ---------
.../extension/columnar/validator/Validators.scala | 3 +-
.../sql/execution/FallbackStrategiesSuite.scala | 34 ++++--
.../GlutenReplaceHashWithSortAggSuite.scala | 4 +-
.../sql/execution/FallbackStrategiesSuite.scala | 33 ++++--
.../sql/execution/FallbackStrategiesSuite.scala | 33 ++++--
17 files changed, 215 insertions(+), 161 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index 3c151df7e..86a69f842 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -380,8 +380,6 @@ object CHBackendSettings extends BackendSettingsApi with
Logging {
GlutenConfig.getConf.enableNativeWriter.getOrElse(false)
}
- override def mergeTwoPhasesHashBaseAggregateIfNeed(): Boolean = true
-
override def supportCartesianProductExec(): Boolean = true
override def supportHashBuildJoinTypeOnLeft: JoinType => Boolean = {
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
index f4a7522d3..fb5147157 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
@@ -64,10 +64,8 @@ private object CHRuleApi {
injector.injectTransform(_ => RemoveTransitions)
injector.injectTransform(c => FallbackOnANSIMode.apply(c.session))
injector.injectTransform(c => FallbackMultiCodegens.apply(c.session))
- injector.injectTransform(c => PlanOneRowRelation.apply(c.session))
injector.injectTransform(_ => RewriteSubqueryBroadcast())
injector.injectTransform(c => FallbackBroadcastHashJoin.apply(c.session))
- injector.injectTransform(_ => FallbackEmptySchemaRelation())
injector.injectTransform(c =>
MergeTwoPhasesHashBaseAggregate.apply(c.session))
injector.injectTransform(_ => RewriteSparkPlanRulesManager())
injector.injectTransform(_ => AddFallbackTagRule())
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/CommonSubexpressionEliminateRule.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CommonSubexpressionEliminateRule.scala
similarity index 98%
rename from
gluten-core/src/main/scala/org/apache/gluten/extension/CommonSubexpressionEliminateRule.scala
rename to
backends-clickhouse/src/main/scala/org/apache/gluten/extension/CommonSubexpressionEliminateRule.scala
index 29199eb0e..a3b74366f 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/CommonSubexpressionEliminateRule.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CommonSubexpressionEliminateRule.scala
@@ -21,8 +21,7 @@ import org.apache.gluten.GlutenConfig
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
-import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction
+import
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression,
AggregateFunction}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.internal.SQLConf
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/CountDistinctWithoutExpand.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CountDistinctWithoutExpand.scala
similarity index 100%
rename from
gluten-core/src/main/scala/org/apache/gluten/extension/CountDistinctWithoutExpand.scala
rename to
backends-clickhouse/src/main/scala/org/apache/gluten/extension/CountDistinctWithoutExpand.scala
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MergeTwoPhasesHashAggregate.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/MergeTwoPhasesHashBaseAggregate.scala
similarity index 94%
rename from
gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MergeTwoPhasesHashAggregate.scala
rename to
backends-clickhouse/src/main/scala/org/apache/gluten/extension/MergeTwoPhasesHashBaseAggregate.scala
index e19cd09a0..43adf27b4 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MergeTwoPhasesHashAggregate.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/MergeTwoPhasesHashBaseAggregate.scala
@@ -14,10 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.extension.columnar
+package org.apache.gluten.extension
import org.apache.gluten.GlutenConfig
-import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.aggregate.{Complete, Final,
Partial}
@@ -39,7 +38,7 @@ case class MergeTwoPhasesHashBaseAggregate(session:
SparkSession) extends Rule[S
val columnarConf: GlutenConfig = GlutenConfig.getConf
val scanOnly: Boolean = columnarConf.enableScanOnly
val enableColumnarHashAgg: Boolean = !scanOnly &&
columnarConf.enableColumnarHashAgg
- val replaceSortAggWithHashAgg =
BackendsApiManager.getSettings.replaceSortAggWithHashAgg
+ val replaceSortAggWithHashAgg: Boolean =
GlutenConfig.getConf.forceToUseHashAgg
private def isPartialAgg(partialAgg: BaseAggregateExec, finalAgg:
BaseAggregateExec): Boolean = {
// TODO: now it can not support to merge agg which there are the filters
in the aggregate exprs.
@@ -57,10 +56,7 @@ case class MergeTwoPhasesHashBaseAggregate(session:
SparkSession) extends Rule[S
}
override def apply(plan: SparkPlan): SparkPlan = {
- if (
- !enableColumnarHashAgg || !BackendsApiManager.getSettings
- .mergeTwoPhasesHashBaseAggregateIfNeed()
- ) {
+ if (!enableColumnarHashAgg) {
plan
} else {
plan.transformDown {
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/RewriteDateTimestampComparisonRule.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteDateTimestampComparisonRule.scala
similarity index 99%
rename from
gluten-core/src/main/scala/org/apache/gluten/extension/RewriteDateTimestampComparisonRule.scala
rename to
backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteDateTimestampComparisonRule.scala
index ec1106955..ea92ddec2 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/RewriteDateTimestampComparisonRule.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteDateTimestampComparisonRule.scala
@@ -27,8 +27,6 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
-import java.lang.IllegalArgumentException
-
// For readable, people usually convert a unix timestamp into date, and
compare it with another
// date. For example
// select * from table where '2023-11-02' >= from_unixtime(unix_timestamp,
'yyyy-MM-dd')
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/RewriteToDateExpresstionRule.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteToDateExpresstionRule.scala
similarity index 100%
rename from
gluten-core/src/main/scala/org/apache/gluten/extension/RewriteToDateExpresstionRule.scala
rename to
backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteToDateExpresstionRule.scala
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 21175f20e..065adf338 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -28,12 +28,10 @@ import
org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat.{DwrfReadFo
import org.apache.gluten.utils._
import org.apache.spark.sql.catalyst.catalog.BucketSpec
-import org.apache.spark.sql.catalyst.expressions.{Alias, CumeDist, DenseRank,
Descending, EulerNumber, Expression, Lag, Lead, Literal, MakeYMInterval,
NamedExpression, NthValue, NTile, PercentRank, Pi, Rand, RangeFrame, Rank,
RowNumber, SortOrder, SparkPartitionID, SparkVersion, SpecialFrameBoundary,
SpecifiedWindowFrame, Uuid}
-import
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression,
ApproximatePercentile, Count, Sum}
+import org.apache.spark.sql.catalyst.expressions.{Alias, CumeDist, DenseRank,
Descending, Expression, Lag, Lead, NamedExpression, NthValue, NTile,
PercentRank, RangeFrame, Rank, RowNumber, SortOrder, SpecialFrameBoundary,
SpecifiedWindowFrame}
+import
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression,
ApproximatePercentile}
import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
-import org.apache.spark.sql.execution.{ProjectExec, SparkPlan}
-import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import
org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand
import org.apache.spark.sql.execution.datasources.{FileFormat,
InsertIntoHadoopFsRelationCommand}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
@@ -443,49 +441,6 @@ object VeloxBackendSettings extends BackendSettingsApi {
}
}
- /**
- * Check whether a plan needs to be offloaded even though they have empty
input schema, e.g,
- * Sum(1), Count(1), rand(), etc.
- * @param plan:
- * The Spark plan to check.
- */
- private def mayNeedOffload(plan: SparkPlan): Boolean = {
- def checkExpr(expr: Expression): Boolean = {
- expr match {
- // Block directly falling back the below functions by
FallbackEmptySchemaRelation.
- case alias: Alias => checkExpr(alias.child)
- case _: Rand | _: Uuid | _: MakeYMInterval | _: SparkPartitionID | _:
EulerNumber | _: Pi |
- _: SparkVersion =>
- true
- case _ => false
- }
- }
-
- plan match {
- case exec: HashAggregateExec if exec.aggregateExpressions.nonEmpty =>
- // Check Sum(Literal) or Count(Literal).
- exec.aggregateExpressions.forall(
- expression => {
- val aggFunction = expression.aggregateFunction
- aggFunction match {
- case Sum(Literal(_, _), _) => true
- case Count(Seq(Literal(_, _))) => true
- case _ => false
- }
- })
- case p: ProjectExec if p.projectList.nonEmpty =>
- p.projectList.forall(checkExpr(_))
- case _ =>
- false
- }
- }
-
- override def fallbackOnEmptySchema(plan: SparkPlan): Boolean = {
- // Count(1) and Sum(1) are special cases that Velox backend can handle.
- // Do not fallback it and its children in the first place.
- !mayNeedOffload(plan)
- }
-
override def fallbackAggregateWithEmptyOutputChild(): Boolean = true
override def recreateJoinExecOnFallback(): Boolean = true
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index 645407be8..f152da885 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -18,7 +18,8 @@ package org.apache.gluten.backendsapi.velox
import org.apache.gluten.backendsapi.RuleApi
import org.apache.gluten.datasource.ArrowConvertorRule
-import org.apache.gluten.extension._
+import org.apache.gluten.extension.{ArrowScanReplaceRule,
BloomFilterMightContainJointRewriteRule, CollectRewriteRule,
FlushableHashAggregateRule, HLLRewriteRule}
+import
org.apache.gluten.extension.EmptySchemaWorkaround.{FallbackEmptySchemaRelation,
PlanOneRowRelation}
import org.apache.gluten.extension.columnar._
import
org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow,
RemoveTopmostColumnarToRow, RewriteSubqueryBroadcast, TransformPreOverrides}
import org.apache.gluten.extension.columnar.enumerated.EnumeratedTransform
@@ -61,7 +62,6 @@ private object VeloxRuleApi {
injector.injectTransform(c =>
BloomFilterMightContainJointRewriteRule.apply(c.session))
injector.injectTransform(c => ArrowScanReplaceRule.apply(c.session))
injector.injectTransform(_ => FallbackEmptySchemaRelation())
- injector.injectTransform(c =>
MergeTwoPhasesHashBaseAggregate.apply(c.session))
injector.injectTransform(_ => RewriteSparkPlanRulesManager())
injector.injectTransform(_ => AddFallbackTagRule())
injector.injectTransform(_ => TransformPreOverrides())
@@ -103,7 +103,6 @@ private object VeloxRuleApi {
injector.inject(_ => RewriteSubqueryBroadcast())
injector.inject(c =>
BloomFilterMightContainJointRewriteRule.apply(c.session))
injector.inject(c => ArrowScanReplaceRule.apply(c.session))
- injector.inject(c => MergeTwoPhasesHashBaseAggregate.apply(c.session))
// Gluten RAS: The RAS rule.
injector.inject(c => EnumeratedTransform(c.session, c.outputsColumnar))
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/extension/EmptySchemaWorkaround.scala
b/backends-velox/src/main/scala/org/apache/gluten/extension/EmptySchemaWorkaround.scala
new file mode 100644
index 000000000..3f34e7fc2
--- /dev/null
+++
b/backends-velox/src/main/scala/org/apache/gluten/extension/EmptySchemaWorkaround.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension
+
+import org.apache.gluten.GlutenConfig
+import org.apache.gluten.extension.columnar.FallbackTags
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference,
EulerNumber, Expression, Literal, MakeYMInterval, Pi, Rand, SparkPartitionID,
SparkVersion, Uuid}
+import org.apache.spark.sql.catalyst.expressions.aggregate.{Count, Sum}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{ProjectExec, RDDScanExec, SparkPlan,
UnaryExecNode}
+import org.apache.spark.sql.execution.aggregate.HashAggregateExec
+import org.apache.spark.sql.execution.datasources.WriteFilesExec
+import org.apache.spark.sql.types.StringType
+
+/** Rules to make Velox backend work correctly with query plans that have
empty output schemas. */
+object EmptySchemaWorkaround {
+
+ /**
+ * This rule plans [[RDDScanExec]] with a fake schema to make gluten work,
because gluten does not
+ * support empty output relation, see [[FallbackEmptySchemaRelation]].
+ */
+ case class PlanOneRowRelation(spark: SparkSession) extends Rule[SparkPlan] {
+ override def apply(plan: SparkPlan): SparkPlan = {
+ if (!GlutenConfig.getConf.enableOneRowRelationColumnar) {
+ return plan
+ }
+
+ plan.transform {
+ // We should make sure the output does not change, e.g.
+ // Window
+ // OneRowRelation
+ case u: UnaryExecNode
+ if u.child.isInstanceOf[RDDScanExec] &&
+ u.child.asInstanceOf[RDDScanExec].name == "OneRowRelation" &&
+ u.outputSet != u.child.outputSet =>
+ val rdd = spark.sparkContext.parallelize(InternalRow(null) :: Nil, 1)
+ val attr = AttributeReference("fake_column", StringType)()
+ u.withNewChildren(RDDScanExec(attr :: Nil, rdd, "OneRowRelation") ::
Nil)
+ }
+ }
+ }
+
+ /**
+ * FIXME To be removed: Since Velox backend is the only one to use the
strategy, and we already
+ * support offloading zero-column batch in ColumnarBatchInIterator via PR
#3309.
+ *
+ * We'd make sure all Velox operators be able to handle zero-column input
correctly then remove
+ * the rule together with [[PlanOneRowRelation]].
+ */
+ case class FallbackEmptySchemaRelation() extends Rule[SparkPlan] {
+ override def apply(plan: SparkPlan): SparkPlan = plan.transformDown {
+ case p =>
+ if (fallbackOnEmptySchema(p)) {
+ if (p.children.exists(_.output.isEmpty)) {
+ // Some backends are not eligible to offload plan with zero-column
input.
+ // If any child have empty output, mark the plan and that child as
UNSUPPORTED.
+ FallbackTags.add(p, "at least one of its children has empty
output")
+ p.children.foreach {
+ child =>
+ if (child.output.isEmpty &&
!child.isInstanceOf[WriteFilesExec]) {
+ FallbackTags.add(child, "at least one of its children has
empty output")
+ }
+ }
+ }
+ }
+ p
+ }
+
+ private def fallbackOnEmptySchema(plan: SparkPlan): Boolean = {
+ // Count(1) and Sum(1) are special cases that Velox backend can handle.
+ // Do not fallback it and its children in the first place.
+ !mayNeedOffload(plan)
+ }
+
+ /**
+ * Check whether a plan needs to be offloaded even though they have empty
input schema, e.g,
+ * Sum(1), Count(1), rand(), etc.
+ * @param plan:
+ * The Spark plan to check.
+ *
+ * Since https://github.com/apache/incubator-gluten/pull/2749.
+ */
+ private def mayNeedOffload(plan: SparkPlan): Boolean = {
+ def checkExpr(expr: Expression): Boolean = {
+ expr match {
+ // Block directly falling back the below functions by
FallbackEmptySchemaRelation.
+ case alias: Alias => checkExpr(alias.child)
+ case _: Rand | _: Uuid | _: MakeYMInterval | _: SparkPartitionID |
_: EulerNumber |
+ _: Pi | _: SparkVersion =>
+ true
+ case _ => false
+ }
+ }
+
+ plan match {
+ case exec: HashAggregateExec if exec.aggregateExpressions.nonEmpty =>
+ // Check Sum(Literal) or Count(Literal).
+ exec.aggregateExpressions.forall(
+ expression => {
+ val aggFunction = expression.aggregateFunction
+ aggFunction match {
+ case Sum(Literal(_, _), _) => true
+ case Count(Seq(Literal(_, _))) => true
+ case _ => false
+ }
+ })
+ case p: ProjectExec if p.projectList.nonEmpty =>
+ p.projectList.forall(checkExpr(_))
+ case _ =>
+ false
+ }
+ }
+ }
+}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index c9a0301b8..c9205bae9 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -69,7 +69,6 @@ trait BackendSettingsApi {
case _ => false
}
def supportStructType(): Boolean = false
- def fallbackOnEmptySchema(plan: SparkPlan): Boolean = false
// Whether to fallback aggregate at the same time if its empty-output child
is fallen back.
def fallbackAggregateWithEmptyOutputChild(): Boolean = false
@@ -90,12 +89,6 @@ trait BackendSettingsApi {
def excludeScanExecFromCollapsedStage(): Boolean = false
def rescaleDecimalArithmetic: Boolean = false
- /**
- * Whether to replace sort agg with hash agg., e.g., sort agg will be used
in spark's planning for
- * string type input.
- */
- def replaceSortAggWithHashAgg: Boolean =
GlutenConfig.getConf.forceToUseHashAgg
-
/** Get the config prefix for each backend */
def getBackendConfigPrefix: String
@@ -147,9 +140,6 @@ trait BackendSettingsApi {
def supportSampleExec(): Boolean = false
- /** Merge two phases hash based aggregate if need */
- def mergeTwoPhasesHashBaseAggregateIfNeed(): Boolean = false
-
def supportColumnarArrowUdf(): Boolean = false
def generateHdfsConfForLibhdfs(): Boolean = false
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackRules.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackRules.scala
index f9eaa4179..6b043fbce 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackRules.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackRules.scala
@@ -27,8 +27,6 @@ import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.spark.api.python.EvalPythonExecTransformer
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.{TreeNode, TreeNodeTag}
import org.apache.spark.sql.execution._
@@ -41,7 +39,6 @@ import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.execution.python.{ArrowEvalPythonExec,
BatchEvalPythonExec}
import org.apache.spark.sql.execution.window.{WindowExec,
WindowGroupLimitExecShim}
import org.apache.spark.sql.hive.HiveTableScanExecTransformer
-import org.apache.spark.sql.types.StringType
import org.apache.commons.lang3.exception.ExceptionUtils
@@ -241,58 +238,6 @@ case class FallbackMultiCodegens(session: SparkSession)
extends Rule[SparkPlan]
}
}
-/**
- * This rule plans [[RDDScanExec]] with a fake schema to make gluten work,
because gluten does not
- * support empty output relation, see [[FallbackEmptySchemaRelation]].
- */
-case class PlanOneRowRelation(spark: SparkSession) extends Rule[SparkPlan] {
- override def apply(plan: SparkPlan): SparkPlan = {
- if (!GlutenConfig.getConf.enableOneRowRelationColumnar) {
- return plan
- }
-
- plan.transform {
- // We should make sure the output does not change, e.g.
- // Window
- // OneRowRelation
- case u: UnaryExecNode
- if u.child.isInstanceOf[RDDScanExec] &&
- u.child.asInstanceOf[RDDScanExec].name == "OneRowRelation" &&
- u.outputSet != u.child.outputSet =>
- val rdd = spark.sparkContext.parallelize(InternalRow(null) :: Nil, 1)
- val attr = AttributeReference("fake_column", StringType)()
- u.withNewChildren(RDDScanExec(attr :: Nil, rdd, "OneRowRelation") ::
Nil)
- }
- }
-}
-
-/**
- * FIXME To be removed: Since Velox backend is the only one to use the
strategy, and we already
- * support offloading zero-column batch in ColumnarBatchInIterator via PR
#3309.
- *
- * We'd make sure all Velox operators be able to handle zero-column input
correctly then remove the
- * rule together with [[PlanOneRowRelation]].
- */
-case class FallbackEmptySchemaRelation() extends Rule[SparkPlan] {
- override def apply(plan: SparkPlan): SparkPlan = plan.transformDown {
- case p =>
- if (BackendsApiManager.getSettings.fallbackOnEmptySchema(p)) {
- if (p.children.exists(_.output.isEmpty)) {
- // Some backends are not eligible to offload plan with zero-column
input.
- // If any child have empty output, mark the plan and that child as
UNSUPPORTED.
- FallbackTags.add(p, "at least one of its children has empty output")
- p.children.foreach {
- child =>
- if (child.output.isEmpty && !child.isInstanceOf[WriteFilesExec])
{
- FallbackTags.add(child, "at least one of its children has
empty output")
- }
- }
- }
- }
- p
- }
-}
-
// This rule will try to convert a plan into plan transformer.
// The doValidate function will be called to check if the conversion is
supported.
// If false is returned or any unsupported exception is thrown, a row guard
will
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
index a85cb163c..f1cb47923 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
@@ -143,8 +143,6 @@ object Validators {
case p: SortMergeJoinExec if !settings.supportSortMergeJoinExec() =>
fail(p)
case p: WriteFilesExec if !settings.enableNativeWriteFiles() =>
fail(p)
- case p: SortAggregateExec if !settings.replaceSortAggWithHashAgg =>
- fail(p)
case p: CartesianProductExec if !settings.supportCartesianProductExec()
=> fail(p)
case p: TakeOrderedAndProjectExec if
!settings.supportColumnarShuffleExec() => fail(p)
case _ => pass()
@@ -162,6 +160,7 @@ object Validators {
case p: FilterExec if !conf.enableColumnarFilter => fail(p)
case p: UnionExec if !conf.enableColumnarUnion => fail(p)
case p: ExpandExec if !conf.enableColumnarExpand => fail(p)
+ case p: SortAggregateExec if !conf.forceToUseHashAgg => fail(p)
case p: ShuffledHashJoinExec if !conf.enableColumnarShuffledHashJoin =>
fail(p)
case p: ShuffleExchangeExec if !conf.enableColumnarShuffle => fail(p)
case p: BroadcastExchangeExec if !conf.enableColumnarBroadcastExchange
=> fail(p)
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index 1ce0025f2..a4da5c127 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -17,10 +17,9 @@
package org.apache.spark.sql.execution
import org.apache.gluten.GlutenConfig
-import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution.BasicScanExecTransformer
import org.apache.gluten.extension.GlutenPlan
-import org.apache.gluten.extension.columnar.{ExpandFallbackPolicy,
FallbackEmptySchemaRelation, FallbackTags, RemoveFallbackTagRule}
+import org.apache.gluten.extension.columnar.{ExpandFallbackPolicy,
FallbackTags, RemoveFallbackTagRule}
import
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleBuilder
import
org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow
import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
@@ -31,6 +30,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.rules.Rule
class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
import FallbackStrategiesSuite._
@@ -133,13 +133,9 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
val rule = FallbackEmptySchemaRelation()
val newPlan = rule.apply(originalPlan)
val reason = FallbackTags.get(newPlan).reason()
- if (BackendsApiManager.getSettings.fallbackOnEmptySchema(newPlan)) {
- assert(
- reason.contains("fake reason") &&
- reason.contains("at least one of its children has empty output"))
- } else {
- assert(reason.contains("fake reason"))
- }
+ assert(
+ reason.contains("fake reason") &&
+ reason.contains("at least one of its children has empty output"))
}
testGluten("test enabling/disabling Gluten at thread level") {
@@ -173,6 +169,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
thread.join(10000)
}
}
+
private object FallbackStrategiesSuite {
def newRuleApplier(
spark: SparkSession,
@@ -189,6 +186,25 @@ private object FallbackStrategiesSuite {
)
}
+ // TODO: Generalize the code among shim versions.
+ case class FallbackEmptySchemaRelation() extends Rule[SparkPlan] {
+ override def apply(plan: SparkPlan): SparkPlan = plan.transformDown {
+ case p =>
+ if (p.children.exists(_.output.isEmpty)) {
+ // Some backends are not eligible to offload plan with zero-column
input.
+ // If any child have empty output, mark the plan and that child as
UNSUPPORTED.
+ FallbackTags.add(p, "at least one of its children has empty output")
+ p.children.foreach {
+ child =>
+ if (child.output.isEmpty) {
+ FallbackTags.add(child, "at least one of its children has
empty output")
+ }
+ }
+ }
+ p
+ }
+ }
+
case class LeafOp(override val supportsColumnar: Boolean = false) extends
LeafExecNode {
override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
override def output: Seq[Attribute] = Seq.empty
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenReplaceHashWithSortAggSuite.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenReplaceHashWithSortAggSuite.scala
index 92e6fee97..f394b4687 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenReplaceHashWithSortAggSuite.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenReplaceHashWithSortAggSuite.scala
@@ -16,8 +16,8 @@
*/
package org.apache.spark.sql.execution
-import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution.HashAggregateExecBaseTransformer
+import org.apache.gluten.utils.BackendTestUtils
import org.apache.spark.sql.{DataFrame, GlutenSQLTestsBaseTrait}
import org.apache.spark.sql.execution.aggregate.{ObjectHashAggregateExec,
SortAggregateExec}
@@ -99,7 +99,7 @@ class GlutenReplaceHashWithSortAggSuite
|)
|GROUP BY key
""".stripMargin
- if
(BackendsApiManager.getSettings.mergeTwoPhasesHashBaseAggregateIfNeed()) {
+ if (BackendTestUtils.isCHBackendLoaded()) {
checkAggs(query, 1, 0, 1, 0)
} else {
checkAggs(query, aggExprInfo._2, aggExprInfo._3, aggExprInfo._4,
aggExprInfo._5)
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index 3acc9c4b3..a4da5c127 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -17,10 +17,9 @@
package org.apache.spark.sql.execution
import org.apache.gluten.GlutenConfig
-import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution.BasicScanExecTransformer
import org.apache.gluten.extension.GlutenPlan
-import org.apache.gluten.extension.columnar.{ExpandFallbackPolicy,
FallbackEmptySchemaRelation, FallbackTags, RemoveFallbackTagRule}
+import org.apache.gluten.extension.columnar.{ExpandFallbackPolicy,
FallbackTags, RemoveFallbackTagRule}
import
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleBuilder
import
org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow
import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
@@ -31,6 +30,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.rules.Rule
class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
import FallbackStrategiesSuite._
@@ -133,13 +133,9 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
val rule = FallbackEmptySchemaRelation()
val newPlan = rule.apply(originalPlan)
val reason = FallbackTags.get(newPlan).reason()
- if (BackendsApiManager.getSettings.fallbackOnEmptySchema(newPlan)) {
- assert(
- reason.contains("fake reason") &&
- reason.contains("at least one of its children has empty output"))
- } else {
- assert(reason.contains("fake reason"))
- }
+ assert(
+ reason.contains("fake reason") &&
+ reason.contains("at least one of its children has empty output"))
}
testGluten("test enabling/disabling Gluten at thread level") {
@@ -190,6 +186,25 @@ private object FallbackStrategiesSuite {
)
}
+ // TODO: Generalize the code among shim versions.
+ case class FallbackEmptySchemaRelation() extends Rule[SparkPlan] {
+ override def apply(plan: SparkPlan): SparkPlan = plan.transformDown {
+ case p =>
+ if (p.children.exists(_.output.isEmpty)) {
+ // Some backends are not eligible to offload plan with zero-column
input.
+ // If any child have empty output, mark the plan and that child as
UNSUPPORTED.
+ FallbackTags.add(p, "at least one of its children has empty output")
+ p.children.foreach {
+ child =>
+ if (child.output.isEmpty) {
+ FallbackTags.add(child, "at least one of its children has
empty output")
+ }
+ }
+ }
+ p
+ }
+ }
+
case class LeafOp(override val supportsColumnar: Boolean = false) extends
LeafExecNode {
override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
override def output: Seq[Attribute] = Seq.empty
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index bcc4e829b..bbdeebfe6 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -17,10 +17,9 @@
package org.apache.spark.sql.execution
import org.apache.gluten.GlutenConfig
-import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution.BasicScanExecTransformer
import org.apache.gluten.extension.GlutenPlan
-import org.apache.gluten.extension.columnar.{ExpandFallbackPolicy,
FallbackEmptySchemaRelation, FallbackTags, RemoveFallbackTagRule}
+import org.apache.gluten.extension.columnar.{ExpandFallbackPolicy,
FallbackTags, RemoveFallbackTagRule}
import
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleBuilder
import
org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow
import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
@@ -31,6 +30,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.rules.Rule
class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
import FallbackStrategiesSuite._
@@ -134,13 +134,9 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
val rule = FallbackEmptySchemaRelation()
val newPlan = rule.apply(originalPlan)
val reason = FallbackTags.get(newPlan).reason()
- if (BackendsApiManager.getSettings.fallbackOnEmptySchema(newPlan)) {
- assert(
- reason.contains("fake reason") &&
- reason.contains("at least one of its children has empty output"))
- } else {
- assert(reason.contains("fake reason"))
- }
+ assert(
+ reason.contains("fake reason") &&
+ reason.contains("at least one of its children has empty output"))
}
testGluten("test enabling/disabling Gluten at thread level") {
@@ -191,6 +187,25 @@ private object FallbackStrategiesSuite {
)
}
+ // TODO: Generalize the code among shim versions.
+ case class FallbackEmptySchemaRelation() extends Rule[SparkPlan] {
+ override def apply(plan: SparkPlan): SparkPlan = plan.transformDown {
+ case p =>
+ if (p.children.exists(_.output.isEmpty)) {
+ // Some backends are not eligible to offload plan with zero-column
input.
+ // If any child have empty output, mark the plan and that child as
UNSUPPORTED.
+ FallbackTags.add(p, "at least one of its children has empty output")
+ p.children.foreach {
+ child =>
+ if (child.output.isEmpty) {
+ FallbackTags.add(child, "at least one of its children has
empty output")
+ }
+ }
+ }
+ p
+ }
+ }
+
case class LeafOp(override val supportsColumnar: Boolean = false) extends
LeafExecNode {
override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
override def output: Seq[Attribute] = Seq.empty
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]