This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new f667e81df3 [VL] Disable FlushableHashAggreagte when aggregates contain 
sum/avg for floating type (#8986)
f667e81df3 is described below

commit f667e81df31ca2c02afbcdc2b0b72cd12cc7d8c5
Author: zhaokuo <[email protected]>
AuthorDate: Thu Apr 10 23:46:21 2025 +0800

    [VL] Disable FlushableHashAggreagte when aggregates contain sum/avg for 
floating type (#8986)
---
 .../org/apache/gluten/config/VeloxConfig.scala     | 26 +++++++++++
 .../extension/FlushableHashAggregateRule.scala     | 26 ++++++++++-
 .../execution/VeloxAggregateFunctionsSuite.scala   | 54 ++++++++++++++++++++++
 cpp/velox/compute/WholeStageResultIterator.cc      |  5 +-
 cpp/velox/config/VeloxConfig.h                     |  1 +
 5 files changed, 109 insertions(+), 3 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala 
b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
index 6fa91364f6..220f3777c1 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
@@ -63,6 +63,8 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) {
 
   def enablePropagateIgnoreNullKeys: Boolean =
     getConf(VELOX_PROPAGATE_IGNORE_NULL_KEYS_ENABLED)
+
+  def floatingPointMode: String = getConf(FLOATING_POINT_MODE)
 }
 
 object VeloxConfig {
@@ -314,6 +316,19 @@ object VeloxConfig {
       .booleanConf
       .createWithDefault(true)
 
+  val MAX_PARTIAL_AGGREGATION_MEMORY =
+    
buildConf("spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemory")
+      .internal()
+      .doc(
+        "Set the max memory of partial aggregation in bytes. When this option 
is set to a " +
+          "value greater than 0, it will override 
spark.gluten.sql.columnar.backend.velox." +
+          "maxPartialAggregationMemoryRatio. Note: this option only works when 
flushable " +
+          "partial aggregation is enabled. Ignored when 
spark.gluten.sql.columnar.backend." +
+          "velox.flushablePartialAggregation=false."
+      )
+      .bytesConf(ByteUnit.BYTE)
+      .createOptional
+
   val MAX_PARTIAL_AGGREGATION_MEMORY_RATIO =
     
buildConf("spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio")
       .internal()
@@ -539,4 +554,15 @@ object VeloxConfig {
           "avoid unnecessary aggregation on null keys.")
       .booleanConf
       .createWithDefault(true)
+
+  val FLOATING_POINT_MODE =
+    buildConf("spark.gluten.sql.columnar.backend.velox.floatingPointMode")
+      .doc(
+        "Config used to control the tolerance of floating point operations 
alignment with Spark. " +
+          "When the mode is set to strict, flushing is disabled for 
sum(float/double)" +
+          "and avg(float/double). When set to loose, flushing will be 
enabled.")
+      .internal()
+      .stringConf
+      .checkValues(Set("loose", "strict"))
+      .createWithDefault("loose")
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/extension/FlushableHashAggregateRule.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/extension/FlushableHashAggregateRule.scala
index 8caddbe17f..0aa48d8d37 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/extension/FlushableHashAggregateRule.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/extension/FlushableHashAggregateRule.scala
@@ -20,12 +20,13 @@ import org.apache.gluten.config.VeloxConfig
 import org.apache.gluten.execution._
 
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.aggregate.{Partial, 
PartialMerge}
+import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.physical.ClusteredDistribution
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.TreePattern.EXCHANGE
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
+import org.apache.spark.sql.types.{DataType, DoubleType, FloatType}
 
 /**
  * To transform regular aggregation to intermediate aggregation that 
internally enables
@@ -61,6 +62,26 @@ case class FlushableHashAggregateRule(session: SparkSession) 
extends Rule[SparkP
     }
   }
 
+  private def aggregatesNotSupportFlush(aggExprs: Seq[AggregateExpression]): 
Boolean = {
+    if (VeloxConfig.get.floatingPointMode == "loose") {
+      return false
+    }
+
+    def isFloatingPointType(dataType: DataType): Boolean = {
+      dataType == DoubleType || dataType == FloatType
+    }
+
+    def isUnsupportedAggregation(aggExpr: AggregateExpression): Boolean = {
+      aggExpr.aggregateFunction match {
+        case Sum(child, _) if isFloatingPointType(child.dataType) => true
+        case Average(child, _) if isFloatingPointType(child.dataType) => true
+        case _ => false
+      }
+    }
+
+    aggExprs.exists(isUnsupportedAggregation)
+  }
+
   private def replaceEligibleAggregates(plan: SparkPlan)(
       func: RegularHashAggregateExecTransformer => SparkPlan): SparkPlan = {
     def transformDown: SparkPlan => SparkPlan = {
@@ -72,6 +93,9 @@ case class FlushableHashAggregateRule(session: SparkSession) 
extends Rule[SparkP
           if isAggInputAlreadyDistributedWithAggKeys(agg) =>
         // Data already grouped by aggregate keys, Skip.
         agg
+      case agg: RegularHashAggregateExecTransformer
+          if aggregatesNotSupportFlush(agg.aggregateExpressions) =>
+        agg
       case agg: RegularHashAggregateExecTransformer =>
         func(agg)
       case p if !canPropagate(p) => p
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala
index a2117a78aa..b3094ef235 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala
@@ -1273,6 +1273,60 @@ class VeloxAggregateFunctionsFlushSuite extends 
VeloxAggregateFunctionsSuite {
       }
     }
   }
+
+  test("flushable aggregate rule - double sum when floatingPointMode is 
strict") {
+    withSQLConf(
+      "spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemory" -> 
"100",
+      "spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput" -> 
"false",
+      "spark.gluten.sql.columnar.maxBatchSize" -> "2",
+      "spark.gluten.sql.columnar.backend.velox.floatingPointMode" -> "strict"
+    ) {
+      withTempView("t1") {
+        import testImplicits._
+        Seq((24.621d, 1), (12.14d, 1), (0.169d, 1), (6.865d, 1), (1.879d, 1), 
(16.326d, 1))
+          .toDF("c1", "c2")
+          .createOrReplaceTempView("t1")
+        runQueryAndCompare("select c2, cast(sum(c1) as bigint) from t1 group 
by c2") {
+          df =>
+            {
+              assert(
+                getExecutedPlan(df).count(
+                  plan => {
+                    plan.isInstanceOf[RegularHashAggregateExecTransformer]
+                  }) == 2)
+            }
+        }
+      }
+    }
+  }
+
+  test("flushable aggregate rule - double sum when floatingPointMode is 
loose") {
+    withSQLConf(
+      "spark.gluten.sql.columnar.backend.velox.floatingPointMode" -> "loose"
+    ) {
+      withTempView("t1") {
+        import testImplicits._
+        Seq((24.6d, 1), (12.1d, 1), (0.1d, 1), (6.8d, 1), (1.8d, 1), (16.3d, 
1))
+          .toDF("c1", "c2")
+          .createOrReplaceTempView("t1")
+        runQueryAndCompare("select c2, cast(sum(c1) as bigint) from t1 group 
by c2") {
+          df =>
+            {
+              assert(
+                getExecutedPlan(df).count(
+                  plan => {
+                    plan.isInstanceOf[RegularHashAggregateExecTransformer]
+                  }) == 1)
+              assert(
+                getExecutedPlan(df).count(
+                  plan => {
+                    plan.isInstanceOf[FlushableHashAggregateExecTransformer]
+                  }) == 1)
+            }
+        }
+      }
+    }
+  }
 }
 
 object VeloxAggregateFunctionsSuite {
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc 
b/cpp/velox/compute/WholeStageResultIterator.cc
index f4bef872c5..d8ac5165d7 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -486,8 +486,9 @@ std::unordered_map<std::string, std::string> 
WholeStageResultIterator::getQueryC
       // FIXME this uses process-wise off-heap memory which is not for task
       // partial aggregation memory config
       auto offHeapMemory = veloxCfg_->get<int64_t>(kSparkTaskOffHeapMemory, 
facebook::velox::memory::kMaxMemory);
-      auto maxPartialAggregationMemory =
-          
static_cast<long>((veloxCfg_->get<double>(kMaxPartialAggregationMemoryRatio, 
0.1) * offHeapMemory));
+      auto maxPartialAggregationMemory = 
veloxCfg_->get<int64_t>(kMaxPartialAggregationMemory).has_value()
+          ? veloxCfg_->get<int64_t>(kMaxPartialAggregationMemory).value()
+          : 
static_cast<int64_t>((veloxCfg_->get<double>(kMaxPartialAggregationMemoryRatio, 
0.1) * offHeapMemory));
       auto maxExtendedPartialAggregationMemory =
           
static_cast<long>((veloxCfg_->get<double>(kMaxExtendedPartialAggregationMemoryRatio,
 0.15) * offHeapMemory));
       configs[velox::core::QueryConfig::kMaxPartialAggregationMemory] = 
std::to_string(maxPartialAggregationMemory);
diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h
index cb3f891072..2b2b4029ff 100644
--- a/cpp/velox/config/VeloxConfig.h
+++ b/cpp/velox/config/VeloxConfig.h
@@ -52,6 +52,7 @@ const std::string kCompressionKind = 
"spark.io.compression.codec";
 const std::string kSpillCompressionKind = 
"spark.gluten.sql.columnar.backend.velox.spillCompressionCodec";
 const std::string kMaxPartialAggregationMemoryRatio =
     "spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio";
+const std::string kMaxPartialAggregationMemory = 
"spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemory";
 const std::string kMaxExtendedPartialAggregationMemoryRatio =
     
"spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio";
 const std::string kAbandonPartialAggregationMinPct =


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to