This is an automated email from the ASF dual-hosted git repository.
mingliang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 359f38c88f Revert "[GLUTEN-8966][VL] Propagate HashAggregate's
ignoreNullKeys when possible" (#10852)
359f38c88f is described below
commit 359f38c88fc8f85cdade3fdb6e7ef40547f5859c
Author: Mingliang Zhu <[email protected]>
AuthorDate: Sat Oct 11 13:24:05 2025 +0800
Revert "[GLUTEN-8966][VL] Propagate HashAggregate's ignoreNullKeys when
possible" (#10852)
---
.../gluten/backendsapi/velox/VeloxRuleApi.scala | 2 -
.../org/apache/gluten/config/VeloxConfig.scala | 12 ---
.../execution/HashAggregateExecTransformer.scala | 24 ++----
.../HashAggregateIgnoreNullKeysRule.scala | 89 ----------------------
.../execution/VeloxAggregateFunctionsSuite.scala | 22 ------
cpp/velox/substrait/SubstraitToVeloxPlan.cc | 5 --
docs/velox-configuration.md | 1 -
.../HashAggregateExecBaseTransformer.scala | 7 +-
8 files changed, 9 insertions(+), 153 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index 57d86a18d4..485bf48f15 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -120,7 +120,6 @@ object VeloxRuleApi {
injector.injectPostTransform(_ => PullOutDuplicateProject)
injector.injectPostTransform(_ => CollapseProjectExecTransformer)
injector.injectPostTransform(c =>
FlushableHashAggregateRule.apply(c.session))
- injector.injectPostTransform(c =>
HashAggregateIgnoreNullKeysRule.apply(c.session))
injector.injectPostTransform(_ => CollectLimitTransformerRule())
injector.injectPostTransform(_ => CollectTailTransformerRule())
injector.injectPostTransform(_ => V2WritePostRule())
@@ -222,7 +221,6 @@ object VeloxRuleApi {
injector.injectPostTransform(_ => PullOutDuplicateProject)
injector.injectPostTransform(_ => CollapseProjectExecTransformer)
injector.injectPostTransform(c =>
FlushableHashAggregateRule.apply(c.session))
- injector.injectPostTransform(c =>
HashAggregateIgnoreNullKeysRule.apply(c.session))
injector.injectPostTransform(_ => CollectLimitTransformerRule())
injector.injectPostTransform(_ => CollectTailTransformerRule())
injector.injectPostTransform(_ => V2WritePostRule())
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
index b4f4556fe1..1be6749e08 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
@@ -64,9 +64,6 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) {
def veloxOrcScanEnabled: Boolean =
getConf(VELOX_ORC_SCAN_ENABLED)
- def enablePropagateIgnoreNullKeys: Boolean =
- getConf(VELOX_PROPAGATE_IGNORE_NULL_KEYS_ENABLED)
-
def floatingPointMode: String = getConf(FLOATING_POINT_MODE)
def enableRewriteCastArrayToString: Boolean =
@@ -588,15 +585,6 @@ object VeloxConfig extends ConfigRegistry {
.stringConf
.createWithDefault("")
- val VELOX_PROPAGATE_IGNORE_NULL_KEYS_ENABLED =
-
buildConf("spark.gluten.sql.columnar.backend.velox.propagateIgnoreNullKeys")
- .doc(
- "If enabled, we will identify aggregation followed by an inner join " +
- "on the grouping keys, and mark the ignoreNullKeys flag to true to "
+
- "avoid unnecessary aggregation on null keys.")
- .booleanConf
- .createWithDefault(true)
-
val FLOATING_POINT_MODE =
buildConf("spark.gluten.sql.columnar.backend.velox.floatingPointMode")
.doc(
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/HashAggregateExecTransformer.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/HashAggregateExecTransformer.scala
index 9bb72acb1a..e46d5340d0 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/HashAggregateExecTransformer.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/HashAggregateExecTransformer.scala
@@ -49,8 +49,7 @@ abstract class HashAggregateExecTransformer(
aggregateAttributes: Seq[Attribute],
initialInputBufferOffset: Int,
resultExpressions: Seq[NamedExpression],
- child: SparkPlan,
- ignoreNullKeys: Boolean)
+ child: SparkPlan)
extends HashAggregateExecBaseTransformer(
requiredChildDistributionExpressions,
groupingExpressions,
@@ -58,9 +57,7 @@ abstract class HashAggregateExecTransformer(
aggregateAttributes,
initialInputBufferOffset,
resultExpressions,
- child,
- ignoreNullKeys
- ) {
+ child) {
override def output: Seq[Attribute] = {
// TODO: We should have a check to make sure the returned schema actually
matches the output
@@ -186,8 +183,7 @@ abstract class HashAggregateExecTransformer(
private def formatExtOptimizationString(isStreaming: Boolean): String = {
val isStreamingStr = if (isStreaming) "1" else "0"
val allowFlushStr = if (allowFlush) "1" else "0"
- val ignoreNullKeysStr = if (ignoreNullKeys) "1" else "0"
-
s"isStreaming=$isStreamingStr\nallowFlush=$allowFlushStr\nignoreNullKeys=$ignoreNullKeysStr\n"
+ s"isStreaming=$isStreamingStr\nallowFlush=$allowFlushStr\n"
}
// Create aggregate function node and add to list.
@@ -661,8 +657,7 @@ case class RegularHashAggregateExecTransformer(
aggregateAttributes: Seq[Attribute],
initialInputBufferOffset: Int,
resultExpressions: Seq[NamedExpression],
- child: SparkPlan,
- ignoreNullKeys: Boolean = false)
+ child: SparkPlan)
extends HashAggregateExecTransformer(
requiredChildDistributionExpressions,
groupingExpressions,
@@ -670,9 +665,7 @@ case class RegularHashAggregateExecTransformer(
aggregateAttributes,
initialInputBufferOffset,
resultExpressions,
- child,
- ignoreNullKeys
- ) {
+ child) {
override protected def allowFlush: Boolean = false
@@ -696,8 +689,7 @@ case class FlushableHashAggregateExecTransformer(
aggregateAttributes: Seq[Attribute],
initialInputBufferOffset: Int,
resultExpressions: Seq[NamedExpression],
- child: SparkPlan,
- ignoreNullKeys: Boolean = false)
+ child: SparkPlan)
extends HashAggregateExecTransformer(
requiredChildDistributionExpressions,
groupingExpressions,
@@ -705,9 +697,7 @@ case class FlushableHashAggregateExecTransformer(
aggregateAttributes,
initialInputBufferOffset,
resultExpressions,
- child,
- ignoreNullKeys
- ) {
+ child) {
override protected def allowFlush: Boolean = true
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/extension/HashAggregateIgnoreNullKeysRule.scala
b/backends-velox/src/main/scala/org/apache/gluten/extension/HashAggregateIgnoreNullKeysRule.scala
deleted file mode 100644
index 1f57d93de0..0000000000
---
a/backends-velox/src/main/scala/org/apache/gluten/extension/HashAggregateIgnoreNullKeysRule.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gluten.extension
-
-import org.apache.gluten.config.VeloxConfig
-import org.apache.gluten.execution._
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.plans.Inner
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec
-import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
-import org.apache.spark.sql.execution.joins.BaseJoinExec
-
-/**
- * To identify aggregates that the groupby key is used as inner join keys. In
this case, we can set
- * ignoreNullKeys to true when convert to velox's AggregateNode.
- */
-case class HashAggregateIgnoreNullKeysRule(session: SparkSession) extends
Rule[SparkPlan] {
- override def apply(plan: SparkPlan): SparkPlan = {
- if (!VeloxConfig.get.enablePropagateIgnoreNullKeys) {
- return plan
- }
- plan.transformUp {
- case join: BaseJoinExec if join.joinType == Inner =>
- val newLeftChild = setIgnoreKeysIfAggregateOnJoinKeys(join.left,
join.leftKeys)
- val newRightChild = setIgnoreKeysIfAggregateOnJoinKeys(join.right,
join.rightKeys)
- if (newLeftChild.fastEquals(join.left) &&
newRightChild.fastEquals(join.right)) {
- join
- } else {
- join.withNewChildren(Seq(newLeftChild, newRightChild))
- }
- case p => p
- }
- }
-
- private def setIgnoreKeysIfAggregateOnJoinKeys(
- plan: SparkPlan,
- joinKeys: Seq[Expression]): SparkPlan = plan match {
- case agg: HashAggregateExecTransformer =>
- val newChild = setIgnoreKeysIfAggregateOnJoinKeys(agg.child, joinKeys)
- val canIgnoreNullKeysRule = semanticEquals(agg.groupingExpressions,
joinKeys)
- agg match {
- case f: FlushableHashAggregateExecTransformer =>
- f.copy(ignoreNullKeys = canIgnoreNullKeysRule, child = newChild)
- case r: RegularHashAggregateExecTransformer =>
- r.copy(ignoreNullKeys = canIgnoreNullKeysRule, child = newChild)
- case _ => agg
- }
- case s: ShuffleQueryStageExec =>
- s.copy(plan = setIgnoreKeysIfAggregateOnJoinKeys(s.plan, joinKeys))
- case p if !canPropagate(p) => p
- case other =>
- other.withNewChildren(
- other.children.map(c => setIgnoreKeysIfAggregateOnJoinKeys(c,
joinKeys)))
- }
-
- private def canPropagate(plan: SparkPlan): Boolean = plan match {
- case _: ProjectExecTransformer => true
- case _: WholeStageTransformer => true
- case _: VeloxResizeBatchesExec => true
- case _: ShuffleExchangeLike => true
- case _: VeloxColumnarToRowExec => true
- case _: SortExecTransformer => true
- case _ => false
- }
-
- private def semanticEquals(aggExpression: Seq[Expression], joinKeys:
Seq[Expression]): Boolean = {
- aggExpression.size == joinKeys.size && aggExpression.zip(joinKeys).forall {
- case (e1: Expression, e2: Expression) => e1.semanticEquals(e2)
- }
- }
-}
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala
index f27039292f..1ad6677172 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala
@@ -1192,28 +1192,6 @@ class VeloxAggregateFunctionsDefaultSuite extends
VeloxAggregateFunctionsSuite {
}
}
}
-
- test("aggregate on join keys can set ignoreNullKeys") {
- val s =
- """
- |select count(1) from
- | (select l_orderkey, max(l_partkey) from lineitem group by
l_orderkey) a
- |inner join
- | (select l_orderkey from lineitem) b
- |on a.l_orderkey = b.l_orderkey
- |""".stripMargin
- withSQLConf(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key ->
"true") {
- runQueryAndCompare(s) {
- df =>
- val executedPlan = getExecutedPlan(df)
- assert(executedPlan.exists {
- case a: RegularHashAggregateExecTransformer if a.ignoreNullKeys =>
true
- case a: FlushableHashAggregateExecTransformer if a.ignoreNullKeys
=> true
- case _ => false
- })
- }
- }
- }
}
class VeloxAggregateFunctionsFlushSuite extends VeloxAggregateFunctionsSuite {
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
index ba7a707568..7e4cd1f514 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
@@ -462,11 +462,6 @@ core::PlanNodePtr
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
preGroupingExprs.insert(preGroupingExprs.begin(),
veloxGroupingExprs.begin(), veloxGroupingExprs.end());
}
- if (aggRel.has_advanced_extension() &&
- SubstraitParser::configSetInOptimization(aggRel.advanced_extension(),
"ignoreNullKeys=")) {
- ignoreNullKeys = true;
- }
-
// Get the output names of Aggregation.
std::vector<std::string> aggOutNames;
aggOutNames.reserve(aggRel.measures().size());
diff --git a/docs/velox-configuration.md b/docs/velox-configuration.md
index 19db11c1d5..160f0c88ae 100644
--- a/docs/velox-configuration.md
+++ b/docs/velox-configuration.md
@@ -49,7 +49,6 @@ nav_order: 16
| spark.gluten.sql.columnar.backend.velox.memoryUseHugePages
| false | Use explicit huge pages for Velox memory allocation.
[...]
| spark.gluten.sql.columnar.backend.velox.orc.scan.enabled
| true | Enable velox orc scan. If disabled, vanilla spark orc
scan will be used.
[...]
| spark.gluten.sql.columnar.backend.velox.prefetchRowGroups
| 1 | Set the prefetch row groups for velox file scan
[...]
-| spark.gluten.sql.columnar.backend.velox.propagateIgnoreNullKeys
| true | If enabled, we will identify aggregation followed by
an inner join on the grouping keys, and mark the ignoreNullKeys flag to true to
avoid unnecessary aggregation on null keys.
[...]
| spark.gluten.sql.columnar.backend.velox.queryTraceEnabled
| false | Enable query tracing flag.
[...]
| spark.gluten.sql.columnar.backend.velox.reclaimMaxWaitMs
| 3600000ms | The max time in ms to wait for memory reclaim.
[...]
| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput
| true | If true, combine small columnar batches together
before sending to shuffle. The default minimum output batch size is equal to
0.25 * spark.gluten.sql.columnar.maxBatchSize
[...]
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/HashAggregateExecBaseTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/HashAggregateExecBaseTransformer.scala
index 6e2d638b82..a4bcc6081e 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/HashAggregateExecBaseTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/HashAggregateExecBaseTransformer.scala
@@ -39,8 +39,7 @@ abstract class HashAggregateExecBaseTransformer(
aggregateAttributes: Seq[Attribute],
initialInputBufferOffset: Int,
resultExpressions: Seq[NamedExpression],
- child: SparkPlan,
- ignoreNullKeys: Boolean = false)
+ child: SparkPlan)
extends BaseAggregateExec
with UnaryTransformSupport {
@@ -87,13 +86,11 @@ abstract class HashAggregateExecBaseTransformer(
s"HashAggregateTransformer(keys=$keyString, " +
s"functions=$functionString, " +
s"isStreamingAgg=$isCapableForStreamingAggregation, " +
- s"ignoreNullKeys=$ignoreNullKeys, " +
s"output=$outputString)"
} else {
s"HashAggregateTransformer(keys=$keyString, " +
s"functions=$functionString, " +
- s"isStreamingAgg=$isCapableForStreamingAggregation, " +
- s"ignoreNullKeys=$ignoreNullKeys)"
+ s"isStreamingAgg=$isCapableForStreamingAggregation)"
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]