This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new f667e81df3 [VL] Disable FlushableHashAggreagte when aggregates contain
sum/avg for floating type (#8986)
f667e81df3 is described below
commit f667e81df31ca2c02afbcdc2b0b72cd12cc7d8c5
Author: zhaokuo <[email protected]>
AuthorDate: Thu Apr 10 23:46:21 2025 +0800
[VL] Disable FlushableHashAggreagte when aggregates contain sum/avg for
floating type (#8986)
---
.../org/apache/gluten/config/VeloxConfig.scala | 26 +++++++++++
.../extension/FlushableHashAggregateRule.scala | 26 ++++++++++-
.../execution/VeloxAggregateFunctionsSuite.scala | 54 ++++++++++++++++++++++
cpp/velox/compute/WholeStageResultIterator.cc | 5 +-
cpp/velox/config/VeloxConfig.h | 1 +
5 files changed, 109 insertions(+), 3 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
index 6fa91364f6..220f3777c1 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
@@ -63,6 +63,8 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) {
def enablePropagateIgnoreNullKeys: Boolean =
getConf(VELOX_PROPAGATE_IGNORE_NULL_KEYS_ENABLED)
+
+ def floatingPointMode: String = getConf(FLOATING_POINT_MODE)
}
object VeloxConfig {
@@ -314,6 +316,19 @@ object VeloxConfig {
.booleanConf
.createWithDefault(true)
+ val MAX_PARTIAL_AGGREGATION_MEMORY =
+
buildConf("spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemory")
+ .internal()
+ .doc(
+ "Set the max memory of partial aggregation in bytes. When this option
is set to a " +
+ "value greater than 0, it will override
spark.gluten.sql.columnar.backend.velox." +
+ "maxPartialAggregationMemoryRatio. Note: this option only works when
flushable " +
+ "partial aggregation is enabled. Ignored when
spark.gluten.sql.columnar.backend." +
+ "velox.flushablePartialAggregation=false."
+ )
+ .bytesConf(ByteUnit.BYTE)
+ .createOptional
+
val MAX_PARTIAL_AGGREGATION_MEMORY_RATIO =
buildConf("spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio")
.internal()
@@ -539,4 +554,15 @@ object VeloxConfig {
"avoid unnecessary aggregation on null keys.")
.booleanConf
.createWithDefault(true)
+
+ val FLOATING_POINT_MODE =
+ buildConf("spark.gluten.sql.columnar.backend.velox.floatingPointMode")
+ .doc(
+ "Config used to control the tolerance of floating point operations
alignment with Spark. " +
+ "When the mode is set to strict, flushing is disabled for
sum(float/double)" +
+ "and avg(float/double). When set to loose, flushing will be
enabled.")
+ .internal()
+ .stringConf
+ .checkValues(Set("loose", "strict"))
+ .createWithDefault("loose")
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/extension/FlushableHashAggregateRule.scala
b/backends-velox/src/main/scala/org/apache/gluten/extension/FlushableHashAggregateRule.scala
index 8caddbe17f..0aa48d8d37 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/extension/FlushableHashAggregateRule.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/extension/FlushableHashAggregateRule.scala
@@ -20,12 +20,13 @@ import org.apache.gluten.config.VeloxConfig
import org.apache.gluten.execution._
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.aggregate.{Partial,
PartialMerge}
+import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.physical.ClusteredDistribution
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern.EXCHANGE
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
+import org.apache.spark.sql.types.{DataType, DoubleType, FloatType}
/**
* To transform regular aggregation to intermediate aggregation that
internally enables
@@ -61,6 +62,26 @@ case class FlushableHashAggregateRule(session: SparkSession)
extends Rule[SparkP
}
}
+ private def aggregatesNotSupportFlush(aggExprs: Seq[AggregateExpression]):
Boolean = {
+ if (VeloxConfig.get.floatingPointMode == "loose") {
+ return false
+ }
+
+ def isFloatingPointType(dataType: DataType): Boolean = {
+ dataType == DoubleType || dataType == FloatType
+ }
+
+ def isUnsupportedAggregation(aggExpr: AggregateExpression): Boolean = {
+ aggExpr.aggregateFunction match {
+ case Sum(child, _) if isFloatingPointType(child.dataType) => true
+ case Average(child, _) if isFloatingPointType(child.dataType) => true
+ case _ => false
+ }
+ }
+
+ aggExprs.exists(isUnsupportedAggregation)
+ }
+
private def replaceEligibleAggregates(plan: SparkPlan)(
func: RegularHashAggregateExecTransformer => SparkPlan): SparkPlan = {
def transformDown: SparkPlan => SparkPlan = {
@@ -72,6 +93,9 @@ case class FlushableHashAggregateRule(session: SparkSession)
extends Rule[SparkP
if isAggInputAlreadyDistributedWithAggKeys(agg) =>
// Data already grouped by aggregate keys, Skip.
agg
+ case agg: RegularHashAggregateExecTransformer
+ if aggregatesNotSupportFlush(agg.aggregateExpressions) =>
+ agg
case agg: RegularHashAggregateExecTransformer =>
func(agg)
case p if !canPropagate(p) => p
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala
index a2117a78aa..b3094ef235 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala
@@ -1273,6 +1273,60 @@ class VeloxAggregateFunctionsFlushSuite extends
VeloxAggregateFunctionsSuite {
}
}
}
+
+ test("flushable aggregate rule - double sum when floatingPointMode is
strict") {
+ withSQLConf(
+ "spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemory" ->
"100",
+ "spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput" ->
"false",
+ "spark.gluten.sql.columnar.maxBatchSize" -> "2",
+ "spark.gluten.sql.columnar.backend.velox.floatingPointMode" -> "strict"
+ ) {
+ withTempView("t1") {
+ import testImplicits._
+ Seq((24.621d, 1), (12.14d, 1), (0.169d, 1), (6.865d, 1), (1.879d, 1),
(16.326d, 1))
+ .toDF("c1", "c2")
+ .createOrReplaceTempView("t1")
+ runQueryAndCompare("select c2, cast(sum(c1) as bigint) from t1 group
by c2") {
+ df =>
+ {
+ assert(
+ getExecutedPlan(df).count(
+ plan => {
+ plan.isInstanceOf[RegularHashAggregateExecTransformer]
+ }) == 2)
+ }
+ }
+ }
+ }
+ }
+
+ test("flushable aggregate rule - double sum when floatingPointMode is
loose") {
+ withSQLConf(
+ "spark.gluten.sql.columnar.backend.velox.floatingPointMode" -> "loose"
+ ) {
+ withTempView("t1") {
+ import testImplicits._
+ Seq((24.6d, 1), (12.1d, 1), (0.1d, 1), (6.8d, 1), (1.8d, 1), (16.3d,
1))
+ .toDF("c1", "c2")
+ .createOrReplaceTempView("t1")
+ runQueryAndCompare("select c2, cast(sum(c1) as bigint) from t1 group
by c2") {
+ df =>
+ {
+ assert(
+ getExecutedPlan(df).count(
+ plan => {
+ plan.isInstanceOf[RegularHashAggregateExecTransformer]
+ }) == 1)
+ assert(
+ getExecutedPlan(df).count(
+ plan => {
+ plan.isInstanceOf[FlushableHashAggregateExecTransformer]
+ }) == 1)
+ }
+ }
+ }
+ }
+ }
}
object VeloxAggregateFunctionsSuite {
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc
b/cpp/velox/compute/WholeStageResultIterator.cc
index f4bef872c5..d8ac5165d7 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -486,8 +486,9 @@ std::unordered_map<std::string, std::string>
WholeStageResultIterator::getQueryC
// FIXME this uses process-wise off-heap memory which is not for task
// partial aggregation memory config
auto offHeapMemory = veloxCfg_->get<int64_t>(kSparkTaskOffHeapMemory,
facebook::velox::memory::kMaxMemory);
- auto maxPartialAggregationMemory =
-
static_cast<long>((veloxCfg_->get<double>(kMaxPartialAggregationMemoryRatio,
0.1) * offHeapMemory));
+ auto maxPartialAggregationMemory =
veloxCfg_->get<int64_t>(kMaxPartialAggregationMemory).has_value()
+ ? veloxCfg_->get<int64_t>(kMaxPartialAggregationMemory).value()
+ :
static_cast<int64_t>((veloxCfg_->get<double>(kMaxPartialAggregationMemoryRatio,
0.1) * offHeapMemory));
auto maxExtendedPartialAggregationMemory =
static_cast<long>((veloxCfg_->get<double>(kMaxExtendedPartialAggregationMemoryRatio,
0.15) * offHeapMemory));
configs[velox::core::QueryConfig::kMaxPartialAggregationMemory] =
std::to_string(maxPartialAggregationMemory);
diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h
index cb3f891072..2b2b4029ff 100644
--- a/cpp/velox/config/VeloxConfig.h
+++ b/cpp/velox/config/VeloxConfig.h
@@ -52,6 +52,7 @@ const std::string kCompressionKind =
"spark.io.compression.codec";
const std::string kSpillCompressionKind =
"spark.gluten.sql.columnar.backend.velox.spillCompressionCodec";
const std::string kMaxPartialAggregationMemoryRatio =
"spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio";
+const std::string kMaxPartialAggregationMemory =
"spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemory";
const std::string kMaxExtendedPartialAggregationMemoryRatio =
"spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio";
const std::string kAbandonPartialAggregationMinPct =
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]