This is an automated email from the ASF dual-hosted git repository.

rui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 7d11bb6be7 [VL] Support Spark legacy statistical aggregation function 
behavior (#9181)
7d11bb6be7 is described below

commit 7d11bb6be7318e11e9be416108975829ef57d7c1
Author: Jaime Pan <[email protected]>
AuthorDate: Thu Apr 10 17:13:22 2025 +0800

    [VL] Support Spark legacy statistical aggregation function behavior (#9181)
---
 cpp/core/config/GlutenConfig.h                     |  2 ++
 cpp/velox/compute/WholeStageResultIterator.cc      |  3 +++
 .../utils/clickhouse/ClickHouseTestSettings.scala  |  1 +
 .../GlutenSQLAggregateFunctionSuite.scala          | 26 ++++++++++++++++++++++
 .../utils/clickhouse/ClickHouseTestSettings.scala  |  1 +
 .../GlutenSQLAggregateFunctionSuite.scala          | 26 ++++++++++++++++++++++
 .../utils/clickhouse/ClickHouseTestSettings.scala  |  1 +
 .../GlutenSQLAggregateFunctionSuite.scala          | 26 ++++++++++++++++++++++
 .../utils/clickhouse/ClickHouseTestSettings.scala  |  1 +
 .../GlutenSQLAggregateFunctionSuite.scala          | 26 ++++++++++++++++++++++
 .../org/apache/gluten/config/GlutenConfig.scala    |  7 +++++-
 11 files changed, 119 insertions(+), 1 deletion(-)

diff --git a/cpp/core/config/GlutenConfig.h b/cpp/core/config/GlutenConfig.h
index 51efc25b58..456bff7361 100644
--- a/cpp/core/config/GlutenConfig.h
+++ b/cpp/core/config/GlutenConfig.h
@@ -75,6 +75,8 @@ const std::string kSparkLegacyTimeParserPolicy = 
"spark.sql.legacy.timeParserPol
 const std::string kShuffleFileBufferSize = "spark.shuffle.file.buffer";
 const std::string kSparkMapKeyDedupPolicy = "spark.sql.mapKeyDedupPolicy";
 
+const std::string kSparkLegacyStatisticalAggregate = 
"spark.sql.legacy.statisticalAggregate";
+
 std::unordered_map<std::string, std::string>
 parseConfMap(JNIEnv* env, const uint8_t* planData, const int32_t 
planDataLength);
 
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc 
b/cpp/velox/compute/WholeStageResultIterator.cc
index c19d319012..f4bef872c5 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -567,6 +567,9 @@ std::unordered_map<std::string, std::string> 
WholeStageResultIterator::getQueryC
       configs[velox::core::QueryConfig::kThrowExceptionOnDuplicateMapKeys] = 
"false";
     }
 
+    configs[velox::core::QueryConfig::kSparkLegacyStatisticalAggregate] =
+        std::to_string(veloxCfg_->get<bool>(kSparkLegacyStatisticalAggregate, 
false));
+
     const auto setIfExists = [&](const std::string& glutenKey, const 
std::string& veloxKey) {
       const auto valueOptional = veloxCfg_->get<std::string>(glutenKey);
       if (valueOptional.hasValue()) {
diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
index d569386584..cdd2509680 100644
--- 
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
@@ -985,6 +985,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
     .exclude("Exchange reuse across the whole plan")
   enableSuite[GlutenReuseExchangeAndSubquerySuite]
   enableSuite[GlutenSQLAggregateFunctionSuite]
+    .excludeGlutenTest("Return NaN or null when dividing by zero")
   enableSuite[GlutenSQLWindowFunctionSuite]
     .exclude("window function: partition and order expressions")
     .exclude("window function: expressions in arguments of a window functions")
diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/GlutenSQLAggregateFunctionSuite.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/GlutenSQLAggregateFunctionSuite.scala
index 77d4d70cef..0dd56ae157 100644
--- 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/GlutenSQLAggregateFunctionSuite.scala
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/GlutenSQLAggregateFunctionSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
 import org.apache.gluten.execution.HashAggregateExecBaseTransformer
 
 import org.apache.spark.sql.{GlutenSQLTestsTrait, Row}
+import org.apache.spark.sql.internal.SQLConf
 
 class GlutenSQLAggregateFunctionSuite extends GlutenSQLTestsTrait {
 
@@ -33,4 +34,29 @@ class GlutenSQLAggregateFunctionSuite extends 
GlutenSQLTestsTrait {
     checkAnswer(df, Seq(Row(3, 5)))
     
assert(getExecutedPlan(df).count(_.isInstanceOf[HashAggregateExecBaseTransformer])
 == 4)
   }
+
+  testGluten("Return NaN or null when dividing by zero") {
+    val query =
+      """
+        |select skewness(value), kurtosis(value)
+        |from values (1), (1)
+        |AS tab(value)
+        |""".stripMargin
+    val df = sql(query)
+
+    withSQLConf(
+      SQLConf.LEGACY_STATISTICAL_AGGREGATE.key -> "true"
+    ) {
+      checkAnswer(df, Seq(Row(Double.NaN, Double.NaN)))
+      
assert(getExecutedPlan(df).count(_.isInstanceOf[HashAggregateExecBaseTransformer])
 == 2)
+    }
+
+    withSQLConf(
+      SQLConf.LEGACY_STATISTICAL_AGGREGATE.key ->
+        SQLConf.LEGACY_STATISTICAL_AGGREGATE.defaultValueString
+    ) {
+      checkAnswer(df, Seq(Row(null, null)))
+      
assert(getExecutedPlan(df).count(_.isInstanceOf[HashAggregateExecBaseTransformer])
 == 2)
+    }
+  }
 }
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
index 06845c9570..5601c2d321 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
@@ -982,6 +982,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
     .excludeGlutenTest("replace partial hash aggregate with sort aggregate")
   enableSuite[GlutenReuseExchangeAndSubquerySuite]
   enableSuite[GlutenSQLAggregateFunctionSuite]
+    .excludeGlutenTest("Return NaN or null when dividing by zero")
   enableSuite[GlutenSQLWindowFunctionSuite]
     .exclude("window function: partition and order expressions")
     .exclude("window function: expressions in arguments of a window functions")
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenSQLAggregateFunctionSuite.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenSQLAggregateFunctionSuite.scala
index 77d4d70cef..0dd56ae157 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenSQLAggregateFunctionSuite.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenSQLAggregateFunctionSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
 import org.apache.gluten.execution.HashAggregateExecBaseTransformer
 
 import org.apache.spark.sql.{GlutenSQLTestsTrait, Row}
+import org.apache.spark.sql.internal.SQLConf
 
 class GlutenSQLAggregateFunctionSuite extends GlutenSQLTestsTrait {
 
@@ -33,4 +34,29 @@ class GlutenSQLAggregateFunctionSuite extends 
GlutenSQLTestsTrait {
     checkAnswer(df, Seq(Row(3, 5)))
     
assert(getExecutedPlan(df).count(_.isInstanceOf[HashAggregateExecBaseTransformer])
 == 4)
   }
+
+  testGluten("Return NaN or null when dividing by zero") {
+    val query =
+      """
+        |select skewness(value), kurtosis(value)
+        |from values (1), (1)
+        |AS tab(value)
+        |""".stripMargin
+    val df = sql(query)
+
+    withSQLConf(
+      SQLConf.LEGACY_STATISTICAL_AGGREGATE.key -> "true"
+    ) {
+      checkAnswer(df, Seq(Row(Double.NaN, Double.NaN)))
+      
assert(getExecutedPlan(df).count(_.isInstanceOf[HashAggregateExecBaseTransformer])
 == 2)
+    }
+
+    withSQLConf(
+      SQLConf.LEGACY_STATISTICAL_AGGREGATE.key ->
+        SQLConf.LEGACY_STATISTICAL_AGGREGATE.defaultValueString
+    ) {
+      checkAnswer(df, Seq(Row(null, null)))
+      
assert(getExecutedPlan(df).count(_.isInstanceOf[HashAggregateExecBaseTransformer])
 == 2)
+    }
+  }
 }
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
index 6d38e85c92..9c0d966ef4 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
@@ -844,6 +844,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
     .excludeGlutenTest("replace partial hash aggregate with sort aggregate")
   enableSuite[GlutenReuseExchangeAndSubquerySuite]
   enableSuite[GlutenSQLAggregateFunctionSuite]
+    .excludeGlutenTest("Return NaN or null when dividing by zero")
   enableSuite[GlutenSQLWindowFunctionSuite]
     .exclude("window function: partition and order expressions")
     .exclude("window function: expressions in arguments of a window functions")
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenSQLAggregateFunctionSuite.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenSQLAggregateFunctionSuite.scala
index 77d4d70cef..0dd56ae157 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenSQLAggregateFunctionSuite.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenSQLAggregateFunctionSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
 import org.apache.gluten.execution.HashAggregateExecBaseTransformer
 
 import org.apache.spark.sql.{GlutenSQLTestsTrait, Row}
+import org.apache.spark.sql.internal.SQLConf
 
 class GlutenSQLAggregateFunctionSuite extends GlutenSQLTestsTrait {
 
@@ -33,4 +34,29 @@ class GlutenSQLAggregateFunctionSuite extends 
GlutenSQLTestsTrait {
     checkAnswer(df, Seq(Row(3, 5)))
     
assert(getExecutedPlan(df).count(_.isInstanceOf[HashAggregateExecBaseTransformer])
 == 4)
   }
+
+  testGluten("Return NaN or null when dividing by zero") {
+    val query =
+      """
+        |select skewness(value), kurtosis(value)
+        |from values (1), (1)
+        |AS tab(value)
+        |""".stripMargin
+    val df = sql(query)
+
+    withSQLConf(
+      SQLConf.LEGACY_STATISTICAL_AGGREGATE.key -> "true"
+    ) {
+      checkAnswer(df, Seq(Row(Double.NaN, Double.NaN)))
+      
assert(getExecutedPlan(df).count(_.isInstanceOf[HashAggregateExecBaseTransformer])
 == 2)
+    }
+
+    withSQLConf(
+      SQLConf.LEGACY_STATISTICAL_AGGREGATE.key ->
+        SQLConf.LEGACY_STATISTICAL_AGGREGATE.defaultValueString
+    ) {
+      checkAnswer(df, Seq(Row(null, null)))
+      
assert(getExecutedPlan(df).count(_.isInstanceOf[HashAggregateExecBaseTransformer])
 == 2)
+    }
+  }
 }
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
index caa12f330d..3436be6f85 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
@@ -1973,6 +1973,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
   enableSuite[GlutenReuseExchangeAndSubquerySuite]
   enableSuite[GlutenRuntimeNullChecksV2Writes]
   enableSuite[GlutenSQLAggregateFunctionSuite]
+    .excludeGlutenTest("Return NaN or null when dividing by zero")
   enableSuite[GlutenSQLQuerySuite]
     // Decimal precision exceeds.
     .includeCH("should be able to resolve a persistent view")
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/GlutenSQLAggregateFunctionSuite.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/GlutenSQLAggregateFunctionSuite.scala
index 77d4d70cef..0dd56ae157 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/GlutenSQLAggregateFunctionSuite.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/GlutenSQLAggregateFunctionSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
 import org.apache.gluten.execution.HashAggregateExecBaseTransformer
 
 import org.apache.spark.sql.{GlutenSQLTestsTrait, Row}
+import org.apache.spark.sql.internal.SQLConf
 
 class GlutenSQLAggregateFunctionSuite extends GlutenSQLTestsTrait {
 
@@ -33,4 +34,29 @@ class GlutenSQLAggregateFunctionSuite extends 
GlutenSQLTestsTrait {
     checkAnswer(df, Seq(Row(3, 5)))
     
assert(getExecutedPlan(df).count(_.isInstanceOf[HashAggregateExecBaseTransformer])
 == 4)
   }
+
+  testGluten("Return NaN or null when dividing by zero") {
+    val query =
+      """
+        |select skewness(value), kurtosis(value)
+        |from values (1), (1)
+        |AS tab(value)
+        |""".stripMargin
+    val df = sql(query)
+
+    withSQLConf(
+      SQLConf.LEGACY_STATISTICAL_AGGREGATE.key -> "true"
+    ) {
+      checkAnswer(df, Seq(Row(Double.NaN, Double.NaN)))
+      
assert(getExecutedPlan(df).count(_.isInstanceOf[HashAggregateExecBaseTransformer])
 == 2)
+    }
+
+    withSQLConf(
+      SQLConf.LEGACY_STATISTICAL_AGGREGATE.key ->
+        SQLConf.LEGACY_STATISTICAL_AGGREGATE.defaultValueString
+    ) {
+      checkAnswer(df, Seq(Row(null, null)))
+      
assert(getExecutedPlan(df).count(_.isInstanceOf[HashAggregateExecBaseTransformer])
 == 2)
+    }
+  }
 }
diff --git 
a/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala 
b/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index 029af3229d..67658c0e85 100644
--- a/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -462,6 +462,7 @@ object GlutenConfig {
       COLUMNAR_MAX_BATCH_SIZE.key,
       SHUFFLE_WRITER_BUFFER_SIZE.key,
       SQLConf.LEGACY_SIZE_OF_NULL.key,
+      SQLConf.LEGACY_STATISTICAL_AGGREGATE.key,
       "spark.io.compression.codec",
       "spark.sql.decimalOperations.allowPrecisionLoss",
       "spark.gluten.sql.columnar.backend.velox.bloomFilter.expectedNumItems",
@@ -507,6 +508,9 @@ object GlutenConfig {
     val keyWithDefault = ImmutableList.of(
       (SQLConf.CASE_SENSITIVE.key, SQLConf.CASE_SENSITIVE.defaultValueString),
       (SQLConf.IGNORE_MISSING_FILES.key, 
SQLConf.IGNORE_MISSING_FILES.defaultValueString),
+      (
+        SQLConf.LEGACY_STATISTICAL_AGGREGATE.key,
+        SQLConf.LEGACY_STATISTICAL_AGGREGATE.defaultValueString),
       (
         COLUMNAR_MEMORY_BACKTRACE_ALLOCATION.key,
         COLUMNAR_MEMORY_BACKTRACE_ALLOCATION.defaultValueString),
@@ -611,7 +615,8 @@ object GlutenConfig {
       SPARK_OFFHEAP_ENABLED,
       DECIMAL_OPERATIONS_ALLOW_PREC_LOSS.key,
       SPARK_REDACTION_REGEX,
-      LEGACY_TIME_PARSER_POLICY.key
+      LEGACY_TIME_PARSER_POLICY.key,
+      LEGACY_STATISTICAL_AGGREGATE.key
     )
     nativeConfMap.putAll(conf.filter(e => keys.contains(e._1)).asJava)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to