This is an automated email from the ASF dual-hosted git repository.
rui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 7d11bb6be7 [VL] Support Spark legacy statistical aggregation function
behavior (#9181)
7d11bb6be7 is described below
commit 7d11bb6be7318e11e9be416108975829ef57d7c1
Author: Jaime Pan <[email protected]>
AuthorDate: Thu Apr 10 17:13:22 2025 +0800
[VL] Support Spark legacy statistical aggregation function behavior (#9181)
---
cpp/core/config/GlutenConfig.h | 2 ++
cpp/velox/compute/WholeStageResultIterator.cc | 3 +++
.../utils/clickhouse/ClickHouseTestSettings.scala | 1 +
.../GlutenSQLAggregateFunctionSuite.scala | 26 ++++++++++++++++++++++
.../utils/clickhouse/ClickHouseTestSettings.scala | 1 +
.../GlutenSQLAggregateFunctionSuite.scala | 26 ++++++++++++++++++++++
.../utils/clickhouse/ClickHouseTestSettings.scala | 1 +
.../GlutenSQLAggregateFunctionSuite.scala | 26 ++++++++++++++++++++++
.../utils/clickhouse/ClickHouseTestSettings.scala | 1 +
.../GlutenSQLAggregateFunctionSuite.scala | 26 ++++++++++++++++++++++
.../org/apache/gluten/config/GlutenConfig.scala | 7 +++++-
11 files changed, 119 insertions(+), 1 deletion(-)
diff --git a/cpp/core/config/GlutenConfig.h b/cpp/core/config/GlutenConfig.h
index 51efc25b58..456bff7361 100644
--- a/cpp/core/config/GlutenConfig.h
+++ b/cpp/core/config/GlutenConfig.h
@@ -75,6 +75,8 @@ const std::string kSparkLegacyTimeParserPolicy =
"spark.sql.legacy.timeParserPol
const std::string kShuffleFileBufferSize = "spark.shuffle.file.buffer";
const std::string kSparkMapKeyDedupPolicy = "spark.sql.mapKeyDedupPolicy";
+const std::string kSparkLegacyStatisticalAggregate =
"spark.sql.legacy.statisticalAggregate";
+
std::unordered_map<std::string, std::string>
parseConfMap(JNIEnv* env, const uint8_t* planData, const int32_t
planDataLength);
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc
b/cpp/velox/compute/WholeStageResultIterator.cc
index c19d319012..f4bef872c5 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -567,6 +567,9 @@ std::unordered_map<std::string, std::string>
WholeStageResultIterator::getQueryC
configs[velox::core::QueryConfig::kThrowExceptionOnDuplicateMapKeys] =
"false";
}
+ configs[velox::core::QueryConfig::kSparkLegacyStatisticalAggregate] =
+ std::to_string(veloxCfg_->get<bool>(kSparkLegacyStatisticalAggregate,
false));
+
const auto setIfExists = [&](const std::string& glutenKey, const
std::string& veloxKey) {
const auto valueOptional = veloxCfg_->get<std::string>(glutenKey);
if (valueOptional.hasValue()) {
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
index d569386584..cdd2509680 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
@@ -985,6 +985,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
.exclude("Exchange reuse across the whole plan")
enableSuite[GlutenReuseExchangeAndSubquerySuite]
enableSuite[GlutenSQLAggregateFunctionSuite]
+ .excludeGlutenTest("Return NaN or null when dividing by zero")
enableSuite[GlutenSQLWindowFunctionSuite]
.exclude("window function: partition and order expressions")
.exclude("window function: expressions in arguments of a window functions")
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/GlutenSQLAggregateFunctionSuite.scala
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/GlutenSQLAggregateFunctionSuite.scala
index 77d4d70cef..0dd56ae157 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/GlutenSQLAggregateFunctionSuite.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/GlutenSQLAggregateFunctionSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
import org.apache.gluten.execution.HashAggregateExecBaseTransformer
import org.apache.spark.sql.{GlutenSQLTestsTrait, Row}
+import org.apache.spark.sql.internal.SQLConf
class GlutenSQLAggregateFunctionSuite extends GlutenSQLTestsTrait {
@@ -33,4 +34,29 @@ class GlutenSQLAggregateFunctionSuite extends
GlutenSQLTestsTrait {
checkAnswer(df, Seq(Row(3, 5)))
assert(getExecutedPlan(df).count(_.isInstanceOf[HashAggregateExecBaseTransformer])
== 4)
}
+
+ testGluten("Return NaN or null when dividing by zero") {
+ val query =
+ """
+ |select skewness(value), kurtosis(value)
+ |from values (1), (1)
+ |AS tab(value)
+ |""".stripMargin
+ val df = sql(query)
+
+ withSQLConf(
+ SQLConf.LEGACY_STATISTICAL_AGGREGATE.key -> "true"
+ ) {
+ checkAnswer(df, Seq(Row(Double.NaN, Double.NaN)))
+
assert(getExecutedPlan(df).count(_.isInstanceOf[HashAggregateExecBaseTransformer])
== 2)
+ }
+
+ withSQLConf(
+ SQLConf.LEGACY_STATISTICAL_AGGREGATE.key ->
+ SQLConf.LEGACY_STATISTICAL_AGGREGATE.defaultValueString
+ ) {
+ checkAnswer(df, Seq(Row(null, null)))
+
assert(getExecutedPlan(df).count(_.isInstanceOf[HashAggregateExecBaseTransformer])
== 2)
+ }
+ }
}
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
index 06845c9570..5601c2d321 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
@@ -982,6 +982,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
.excludeGlutenTest("replace partial hash aggregate with sort aggregate")
enableSuite[GlutenReuseExchangeAndSubquerySuite]
enableSuite[GlutenSQLAggregateFunctionSuite]
+ .excludeGlutenTest("Return NaN or null when dividing by zero")
enableSuite[GlutenSQLWindowFunctionSuite]
.exclude("window function: partition and order expressions")
.exclude("window function: expressions in arguments of a window functions")
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenSQLAggregateFunctionSuite.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenSQLAggregateFunctionSuite.scala
index 77d4d70cef..0dd56ae157 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenSQLAggregateFunctionSuite.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenSQLAggregateFunctionSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
import org.apache.gluten.execution.HashAggregateExecBaseTransformer
import org.apache.spark.sql.{GlutenSQLTestsTrait, Row}
+import org.apache.spark.sql.internal.SQLConf
class GlutenSQLAggregateFunctionSuite extends GlutenSQLTestsTrait {
@@ -33,4 +34,29 @@ class GlutenSQLAggregateFunctionSuite extends
GlutenSQLTestsTrait {
checkAnswer(df, Seq(Row(3, 5)))
assert(getExecutedPlan(df).count(_.isInstanceOf[HashAggregateExecBaseTransformer])
== 4)
}
+
+ testGluten("Return NaN or null when dividing by zero") {
+ val query =
+ """
+ |select skewness(value), kurtosis(value)
+ |from values (1), (1)
+ |AS tab(value)
+ |""".stripMargin
+ val df = sql(query)
+
+ withSQLConf(
+ SQLConf.LEGACY_STATISTICAL_AGGREGATE.key -> "true"
+ ) {
+ checkAnswer(df, Seq(Row(Double.NaN, Double.NaN)))
+
assert(getExecutedPlan(df).count(_.isInstanceOf[HashAggregateExecBaseTransformer])
== 2)
+ }
+
+ withSQLConf(
+ SQLConf.LEGACY_STATISTICAL_AGGREGATE.key ->
+ SQLConf.LEGACY_STATISTICAL_AGGREGATE.defaultValueString
+ ) {
+ checkAnswer(df, Seq(Row(null, null)))
+
assert(getExecutedPlan(df).count(_.isInstanceOf[HashAggregateExecBaseTransformer])
== 2)
+ }
+ }
}
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
index 6d38e85c92..9c0d966ef4 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
@@ -844,6 +844,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
.excludeGlutenTest("replace partial hash aggregate with sort aggregate")
enableSuite[GlutenReuseExchangeAndSubquerySuite]
enableSuite[GlutenSQLAggregateFunctionSuite]
+ .excludeGlutenTest("Return NaN or null when dividing by zero")
enableSuite[GlutenSQLWindowFunctionSuite]
.exclude("window function: partition and order expressions")
.exclude("window function: expressions in arguments of a window functions")
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenSQLAggregateFunctionSuite.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenSQLAggregateFunctionSuite.scala
index 77d4d70cef..0dd56ae157 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenSQLAggregateFunctionSuite.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenSQLAggregateFunctionSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
import org.apache.gluten.execution.HashAggregateExecBaseTransformer
import org.apache.spark.sql.{GlutenSQLTestsTrait, Row}
+import org.apache.spark.sql.internal.SQLConf
class GlutenSQLAggregateFunctionSuite extends GlutenSQLTestsTrait {
@@ -33,4 +34,29 @@ class GlutenSQLAggregateFunctionSuite extends
GlutenSQLTestsTrait {
checkAnswer(df, Seq(Row(3, 5)))
assert(getExecutedPlan(df).count(_.isInstanceOf[HashAggregateExecBaseTransformer])
== 4)
}
+
+ testGluten("Return NaN or null when dividing by zero") {
+ val query =
+ """
+ |select skewness(value), kurtosis(value)
+ |from values (1), (1)
+ |AS tab(value)
+ |""".stripMargin
+ val df = sql(query)
+
+ withSQLConf(
+ SQLConf.LEGACY_STATISTICAL_AGGREGATE.key -> "true"
+ ) {
+ checkAnswer(df, Seq(Row(Double.NaN, Double.NaN)))
+
assert(getExecutedPlan(df).count(_.isInstanceOf[HashAggregateExecBaseTransformer])
== 2)
+ }
+
+ withSQLConf(
+ SQLConf.LEGACY_STATISTICAL_AGGREGATE.key ->
+ SQLConf.LEGACY_STATISTICAL_AGGREGATE.defaultValueString
+ ) {
+ checkAnswer(df, Seq(Row(null, null)))
+
assert(getExecutedPlan(df).count(_.isInstanceOf[HashAggregateExecBaseTransformer])
== 2)
+ }
+ }
}
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
index caa12f330d..3436be6f85 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
@@ -1973,6 +1973,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
enableSuite[GlutenReuseExchangeAndSubquerySuite]
enableSuite[GlutenRuntimeNullChecksV2Writes]
enableSuite[GlutenSQLAggregateFunctionSuite]
+ .excludeGlutenTest("Return NaN or null when dividing by zero")
enableSuite[GlutenSQLQuerySuite]
// Decimal precision exceeds.
.includeCH("should be able to resolve a persistent view")
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/GlutenSQLAggregateFunctionSuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/GlutenSQLAggregateFunctionSuite.scala
index 77d4d70cef..0dd56ae157 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/GlutenSQLAggregateFunctionSuite.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/GlutenSQLAggregateFunctionSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
import org.apache.gluten.execution.HashAggregateExecBaseTransformer
import org.apache.spark.sql.{GlutenSQLTestsTrait, Row}
+import org.apache.spark.sql.internal.SQLConf
class GlutenSQLAggregateFunctionSuite extends GlutenSQLTestsTrait {
@@ -33,4 +34,29 @@ class GlutenSQLAggregateFunctionSuite extends
GlutenSQLTestsTrait {
checkAnswer(df, Seq(Row(3, 5)))
assert(getExecutedPlan(df).count(_.isInstanceOf[HashAggregateExecBaseTransformer])
== 4)
}
+
+ testGluten("Return NaN or null when dividing by zero") {
+ val query =
+ """
+ |select skewness(value), kurtosis(value)
+ |from values (1), (1)
+ |AS tab(value)
+ |""".stripMargin
+ val df = sql(query)
+
+ withSQLConf(
+ SQLConf.LEGACY_STATISTICAL_AGGREGATE.key -> "true"
+ ) {
+ checkAnswer(df, Seq(Row(Double.NaN, Double.NaN)))
+
assert(getExecutedPlan(df).count(_.isInstanceOf[HashAggregateExecBaseTransformer])
== 2)
+ }
+
+ withSQLConf(
+ SQLConf.LEGACY_STATISTICAL_AGGREGATE.key ->
+ SQLConf.LEGACY_STATISTICAL_AGGREGATE.defaultValueString
+ ) {
+ checkAnswer(df, Seq(Row(null, null)))
+
assert(getExecutedPlan(df).count(_.isInstanceOf[HashAggregateExecBaseTransformer])
== 2)
+ }
+ }
}
diff --git
a/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
b/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index 029af3229d..67658c0e85 100644
--- a/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -462,6 +462,7 @@ object GlutenConfig {
COLUMNAR_MAX_BATCH_SIZE.key,
SHUFFLE_WRITER_BUFFER_SIZE.key,
SQLConf.LEGACY_SIZE_OF_NULL.key,
+ SQLConf.LEGACY_STATISTICAL_AGGREGATE.key,
"spark.io.compression.codec",
"spark.sql.decimalOperations.allowPrecisionLoss",
"spark.gluten.sql.columnar.backend.velox.bloomFilter.expectedNumItems",
@@ -507,6 +508,9 @@ object GlutenConfig {
val keyWithDefault = ImmutableList.of(
(SQLConf.CASE_SENSITIVE.key, SQLConf.CASE_SENSITIVE.defaultValueString),
(SQLConf.IGNORE_MISSING_FILES.key,
SQLConf.IGNORE_MISSING_FILES.defaultValueString),
+ (
+ SQLConf.LEGACY_STATISTICAL_AGGREGATE.key,
+ SQLConf.LEGACY_STATISTICAL_AGGREGATE.defaultValueString),
(
COLUMNAR_MEMORY_BACKTRACE_ALLOCATION.key,
COLUMNAR_MEMORY_BACKTRACE_ALLOCATION.defaultValueString),
@@ -611,7 +615,8 @@ object GlutenConfig {
SPARK_OFFHEAP_ENABLED,
DECIMAL_OPERATIONS_ALLOW_PREC_LOSS.key,
SPARK_REDACTION_REGEX,
- LEGACY_TIME_PARSER_POLICY.key
+ LEGACY_TIME_PARSER_POLICY.key,
+ LEGACY_STATISTICAL_AGGREGATE.key
)
nativeConfMap.putAll(conf.filter(e => keys.contains(e._1)).asJava)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]