This is an automated email from the ASF dual-hosted git repository.

mingliang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 359f38c88f Revert "[GLUTEN-8966][VL] Propagate HashAggregate's 
ignoreNullKeys when possible" (#10852)
359f38c88f is described below

commit 359f38c88fc8f85cdade3fdb6e7ef40547f5859c
Author: Mingliang Zhu <[email protected]>
AuthorDate: Sat Oct 11 13:24:05 2025 +0800

    Revert "[GLUTEN-8966][VL] Propagate HashAggregate's ignoreNullKeys when 
possible" (#10852)
---
 .../gluten/backendsapi/velox/VeloxRuleApi.scala    |  2 -
 .../org/apache/gluten/config/VeloxConfig.scala     | 12 ---
 .../execution/HashAggregateExecTransformer.scala   | 24 ++----
 .../HashAggregateIgnoreNullKeysRule.scala          | 89 ----------------------
 .../execution/VeloxAggregateFunctionsSuite.scala   | 22 ------
 cpp/velox/substrait/SubstraitToVeloxPlan.cc        |  5 --
 docs/velox-configuration.md                        |  1 -
 .../HashAggregateExecBaseTransformer.scala         |  7 +-
 8 files changed, 9 insertions(+), 153 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index 57d86a18d4..485bf48f15 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -120,7 +120,6 @@ object VeloxRuleApi {
     injector.injectPostTransform(_ => PullOutDuplicateProject)
     injector.injectPostTransform(_ => CollapseProjectExecTransformer)
     injector.injectPostTransform(c => 
FlushableHashAggregateRule.apply(c.session))
-    injector.injectPostTransform(c => 
HashAggregateIgnoreNullKeysRule.apply(c.session))
     injector.injectPostTransform(_ => CollectLimitTransformerRule())
     injector.injectPostTransform(_ => CollectTailTransformerRule())
     injector.injectPostTransform(_ => V2WritePostRule())
@@ -222,7 +221,6 @@ object VeloxRuleApi {
     injector.injectPostTransform(_ => PullOutDuplicateProject)
     injector.injectPostTransform(_ => CollapseProjectExecTransformer)
     injector.injectPostTransform(c => 
FlushableHashAggregateRule.apply(c.session))
-    injector.injectPostTransform(c => 
HashAggregateIgnoreNullKeysRule.apply(c.session))
     injector.injectPostTransform(_ => CollectLimitTransformerRule())
     injector.injectPostTransform(_ => CollectTailTransformerRule())
     injector.injectPostTransform(_ => V2WritePostRule())
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala 
b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
index b4f4556fe1..1be6749e08 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
@@ -64,9 +64,6 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) {
   def veloxOrcScanEnabled: Boolean =
     getConf(VELOX_ORC_SCAN_ENABLED)
 
-  def enablePropagateIgnoreNullKeys: Boolean =
-    getConf(VELOX_PROPAGATE_IGNORE_NULL_KEYS_ENABLED)
-
   def floatingPointMode: String = getConf(FLOATING_POINT_MODE)
 
   def enableRewriteCastArrayToString: Boolean =
@@ -588,15 +585,6 @@ object VeloxConfig extends ConfigRegistry {
       .stringConf
       .createWithDefault("")
 
-  val VELOX_PROPAGATE_IGNORE_NULL_KEYS_ENABLED =
-    
buildConf("spark.gluten.sql.columnar.backend.velox.propagateIgnoreNullKeys")
-      .doc(
-        "If enabled, we will identify aggregation followed by an inner join " +
-          "on the grouping keys, and mark the ignoreNullKeys flag to true to " 
+
-          "avoid unnecessary aggregation on null keys.")
-      .booleanConf
-      .createWithDefault(true)
-
   val FLOATING_POINT_MODE =
     buildConf("spark.gluten.sql.columnar.backend.velox.floatingPointMode")
       .doc(
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/HashAggregateExecTransformer.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/HashAggregateExecTransformer.scala
index 9bb72acb1a..e46d5340d0 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/execution/HashAggregateExecTransformer.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/HashAggregateExecTransformer.scala
@@ -49,8 +49,7 @@ abstract class HashAggregateExecTransformer(
     aggregateAttributes: Seq[Attribute],
     initialInputBufferOffset: Int,
     resultExpressions: Seq[NamedExpression],
-    child: SparkPlan,
-    ignoreNullKeys: Boolean)
+    child: SparkPlan)
   extends HashAggregateExecBaseTransformer(
     requiredChildDistributionExpressions,
     groupingExpressions,
@@ -58,9 +57,7 @@ abstract class HashAggregateExecTransformer(
     aggregateAttributes,
     initialInputBufferOffset,
     resultExpressions,
-    child,
-    ignoreNullKeys
-  ) {
+    child) {
 
   override def output: Seq[Attribute] = {
     // TODO: We should have a check to make sure the returned schema actually 
matches the output
@@ -186,8 +183,7 @@ abstract class HashAggregateExecTransformer(
   private def formatExtOptimizationString(isStreaming: Boolean): String = {
     val isStreamingStr = if (isStreaming) "1" else "0"
     val allowFlushStr = if (allowFlush) "1" else "0"
-    val ignoreNullKeysStr = if (ignoreNullKeys) "1" else "0"
-    
s"isStreaming=$isStreamingStr\nallowFlush=$allowFlushStr\nignoreNullKeys=$ignoreNullKeysStr\n"
+    s"isStreaming=$isStreamingStr\nallowFlush=$allowFlushStr\n"
   }
 
   // Create aggregate function node and add to list.
@@ -661,8 +657,7 @@ case class RegularHashAggregateExecTransformer(
     aggregateAttributes: Seq[Attribute],
     initialInputBufferOffset: Int,
     resultExpressions: Seq[NamedExpression],
-    child: SparkPlan,
-    ignoreNullKeys: Boolean = false)
+    child: SparkPlan)
   extends HashAggregateExecTransformer(
     requiredChildDistributionExpressions,
     groupingExpressions,
@@ -670,9 +665,7 @@ case class RegularHashAggregateExecTransformer(
     aggregateAttributes,
     initialInputBufferOffset,
     resultExpressions,
-    child,
-    ignoreNullKeys
-  ) {
+    child) {
 
   override protected def allowFlush: Boolean = false
 
@@ -696,8 +689,7 @@ case class FlushableHashAggregateExecTransformer(
     aggregateAttributes: Seq[Attribute],
     initialInputBufferOffset: Int,
     resultExpressions: Seq[NamedExpression],
-    child: SparkPlan,
-    ignoreNullKeys: Boolean = false)
+    child: SparkPlan)
   extends HashAggregateExecTransformer(
     requiredChildDistributionExpressions,
     groupingExpressions,
@@ -705,9 +697,7 @@ case class FlushableHashAggregateExecTransformer(
     aggregateAttributes,
     initialInputBufferOffset,
     resultExpressions,
-    child,
-    ignoreNullKeys
-  ) {
+    child) {
 
   override protected def allowFlush: Boolean = true
 
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/extension/HashAggregateIgnoreNullKeysRule.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/extension/HashAggregateIgnoreNullKeysRule.scala
deleted file mode 100644
index 1f57d93de0..0000000000
--- 
a/backends-velox/src/main/scala/org/apache/gluten/extension/HashAggregateIgnoreNullKeysRule.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gluten.extension
-
-import org.apache.gluten.config.VeloxConfig
-import org.apache.gluten.execution._
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.plans.Inner
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec
-import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
-import org.apache.spark.sql.execution.joins.BaseJoinExec
-
-/**
- * To identify aggregates that the groupby key is used as inner join keys. In 
this case, we can set
- * ignoreNullKeys to true when convert to velox's AggregateNode.
- */
-case class HashAggregateIgnoreNullKeysRule(session: SparkSession) extends 
Rule[SparkPlan] {
-  override def apply(plan: SparkPlan): SparkPlan = {
-    if (!VeloxConfig.get.enablePropagateIgnoreNullKeys) {
-      return plan
-    }
-    plan.transformUp {
-      case join: BaseJoinExec if join.joinType == Inner =>
-        val newLeftChild = setIgnoreKeysIfAggregateOnJoinKeys(join.left, 
join.leftKeys)
-        val newRightChild = setIgnoreKeysIfAggregateOnJoinKeys(join.right, 
join.rightKeys)
-        if (newLeftChild.fastEquals(join.left) && 
newRightChild.fastEquals(join.right)) {
-          join
-        } else {
-          join.withNewChildren(Seq(newLeftChild, newRightChild))
-        }
-      case p => p
-    }
-  }
-
-  private def setIgnoreKeysIfAggregateOnJoinKeys(
-      plan: SparkPlan,
-      joinKeys: Seq[Expression]): SparkPlan = plan match {
-    case agg: HashAggregateExecTransformer =>
-      val newChild = setIgnoreKeysIfAggregateOnJoinKeys(agg.child, joinKeys)
-      val canIgnoreNullKeysRule = semanticEquals(agg.groupingExpressions, 
joinKeys)
-      agg match {
-        case f: FlushableHashAggregateExecTransformer =>
-          f.copy(ignoreNullKeys = canIgnoreNullKeysRule, child = newChild)
-        case r: RegularHashAggregateExecTransformer =>
-          r.copy(ignoreNullKeys = canIgnoreNullKeysRule, child = newChild)
-        case _ => agg
-      }
-    case s: ShuffleQueryStageExec =>
-      s.copy(plan = setIgnoreKeysIfAggregateOnJoinKeys(s.plan, joinKeys))
-    case p if !canPropagate(p) => p
-    case other =>
-      other.withNewChildren(
-        other.children.map(c => setIgnoreKeysIfAggregateOnJoinKeys(c, 
joinKeys)))
-  }
-
-  private def canPropagate(plan: SparkPlan): Boolean = plan match {
-    case _: ProjectExecTransformer => true
-    case _: WholeStageTransformer => true
-    case _: VeloxResizeBatchesExec => true
-    case _: ShuffleExchangeLike => true
-    case _: VeloxColumnarToRowExec => true
-    case _: SortExecTransformer => true
-    case _ => false
-  }
-
-  private def semanticEquals(aggExpression: Seq[Expression], joinKeys: 
Seq[Expression]): Boolean = {
-    aggExpression.size == joinKeys.size && aggExpression.zip(joinKeys).forall {
-      case (e1: Expression, e2: Expression) => e1.semanticEquals(e2)
-    }
-  }
-}
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala
index f27039292f..1ad6677172 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala
@@ -1192,28 +1192,6 @@ class VeloxAggregateFunctionsDefaultSuite extends 
VeloxAggregateFunctionsSuite {
       }
     }
   }
-
-  test("aggregate on join keys can set ignoreNullKeys") {
-    val s =
-      """
-        |select count(1) from
-        |  (select l_orderkey, max(l_partkey) from lineitem group by 
l_orderkey) a
-        |inner join
-        |  (select l_orderkey from lineitem) b
-        |on a.l_orderkey = b.l_orderkey
-        |""".stripMargin
-    withSQLConf(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key -> 
"true") {
-      runQueryAndCompare(s) {
-        df =>
-          val executedPlan = getExecutedPlan(df)
-          assert(executedPlan.exists {
-            case a: RegularHashAggregateExecTransformer if a.ignoreNullKeys => 
true
-            case a: FlushableHashAggregateExecTransformer if a.ignoreNullKeys 
=> true
-            case _ => false
-          })
-      }
-    }
-  }
 }
 
 class VeloxAggregateFunctionsFlushSuite extends VeloxAggregateFunctionsSuite {
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc 
b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
index ba7a707568..7e4cd1f514 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
@@ -462,11 +462,6 @@ core::PlanNodePtr 
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
     preGroupingExprs.insert(preGroupingExprs.begin(), 
veloxGroupingExprs.begin(), veloxGroupingExprs.end());
   }
 
-  if (aggRel.has_advanced_extension() &&
-      SubstraitParser::configSetInOptimization(aggRel.advanced_extension(), 
"ignoreNullKeys=")) {
-    ignoreNullKeys = true;
-  }
-
   // Get the output names of Aggregation.
   std::vector<std::string> aggOutNames;
   aggOutNames.reserve(aggRel.measures().size());
diff --git a/docs/velox-configuration.md b/docs/velox-configuration.md
index 19db11c1d5..160f0c88ae 100644
--- a/docs/velox-configuration.md
+++ b/docs/velox-configuration.md
@@ -49,7 +49,6 @@ nav_order: 16
 | spark.gluten.sql.columnar.backend.velox.memoryUseHugePages                   
    | false             | Use explicit huge pages for Velox memory allocation.  
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | spark.gluten.sql.columnar.backend.velox.orc.scan.enabled                     
    | true              | Enable velox orc scan. If disabled, vanilla spark orc 
scan will be used.                                                              
                                                                                
                                                                                
                                                                                
              [...]
 | spark.gluten.sql.columnar.backend.velox.prefetchRowGroups                    
    | 1                 | Set the prefetch row groups for velox file scan       
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.propagateIgnoreNullKeys              
    | true              | If enabled, we will identify aggregation followed by 
an inner join on the grouping keys, and mark the ignoreNullKeys flag to true to 
avoid unnecessary aggregation on null keys.                                     
                                                                                
                                                                                
               [...]
 | spark.gluten.sql.columnar.backend.velox.queryTraceEnabled                    
    | false             | Enable query tracing flag.                            
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | spark.gluten.sql.columnar.backend.velox.reclaimMaxWaitMs                     
    | 3600000ms         | The max time in ms to wait for memory reclaim.        
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput           
    | true              | If true, combine small columnar batches together 
before sending to shuffle. The default minimum output batch size is equal to 
0.25 * spark.gluten.sql.columnar.maxBatchSize                                   
                                                                                
                                                                                
                      [...]
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/HashAggregateExecBaseTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/HashAggregateExecBaseTransformer.scala
index 6e2d638b82..a4bcc6081e 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/HashAggregateExecBaseTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/HashAggregateExecBaseTransformer.scala
@@ -39,8 +39,7 @@ abstract class HashAggregateExecBaseTransformer(
     aggregateAttributes: Seq[Attribute],
     initialInputBufferOffset: Int,
     resultExpressions: Seq[NamedExpression],
-    child: SparkPlan,
-    ignoreNullKeys: Boolean = false)
+    child: SparkPlan)
   extends BaseAggregateExec
   with UnaryTransformSupport {
 
@@ -87,13 +86,11 @@ abstract class HashAggregateExecBaseTransformer(
       s"HashAggregateTransformer(keys=$keyString, " +
         s"functions=$functionString, " +
         s"isStreamingAgg=$isCapableForStreamingAggregation, " +
-        s"ignoreNullKeys=$ignoreNullKeys, " +
         s"output=$outputString)"
     } else {
       s"HashAggregateTransformer(keys=$keyString, " +
         s"functions=$functionString, " +
-        s"isStreamingAgg=$isCapableForStreamingAggregation, " +
-        s"ignoreNullKeys=$ignoreNullKeys)"
+        s"isStreamingAgg=$isCapableForStreamingAggregation)"
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to