This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 52e6b3402 fix: Add strict floating point mode and fallback to Spark 
for min/max/sort on floating point inputs when enabled (#2747)
52e6b3402 is described below

commit 52e6b34021b4ee803024eb3e57a35cb00315f10b
Author: Andy Grove <[email protected]>
AuthorDate: Fri Nov 14 10:34:00 2025 -0700

    fix: Add strict floating point mode and fallback to Spark for min/max/sort 
on floating point inputs when enabled (#2747)
---
 .../main/scala/org/apache/comet/CometConf.scala    |  9 ++++++
 docs/source/user-guide/latest/compatibility.md     |  9 +++---
 docs/source/user-guide/latest/configs.md           |  1 +
 .../org/apache/comet/serde/CometSortOrder.scala    | 11 +++++--
 .../scala/org/apache/comet/serde/aggregates.scala  | 25 ++++++++++++++-
 .../org/apache/spark/sql/comet/operators.scala     |  9 ++++++
 .../org/apache/comet/CometExpressionSuite.scala    | 12 +++++--
 .../apache/comet/exec/CometAggregateSuite.scala    | 37 +++++++++++++++++++++-
 8 files changed, 100 insertions(+), 13 deletions(-)

diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala 
b/common/src/main/scala/org/apache/comet/CometConf.scala
index 5183ca000..53ae060c0 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -674,6 +674,15 @@ object CometConf extends ShimCometConf {
       .booleanConf
       .createWithDefault(false)
 
+  val COMET_EXEC_STRICT_FLOATING_POINT: ConfigEntry[Boolean] =
+    conf("spark.comet.exec.strictFloatingPoint")
+      .category(CATEGORY_EXEC)
+      .doc(
+        "When enabled, fall back to Spark for floating-point operations that 
may differ from " +
+          s"Spark, such as when comparing or sorting -0.0 and 0.0. 
$COMPAT_GUIDE.")
+      .booleanConf
+      .createWithDefault(false)
+
   val COMET_REGEXP_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] =
     conf("spark.comet.regexp.allowIncompatible")
       .category(CATEGORY_EXEC)
diff --git a/docs/source/user-guide/latest/compatibility.md 
b/docs/source/user-guide/latest/compatibility.md
index 6042b4240..c67536c4e 100644
--- a/docs/source/user-guide/latest/compatibility.md
+++ b/docs/source/user-guide/latest/compatibility.md
@@ -47,11 +47,10 @@ Spark normalizes NaN and zero for floating point numbers 
for several cases. See
 However, one exception is comparison. Spark does not normalize NaN and zero 
when comparing values
 because they are handled well in Spark (e.g., 
`SQLOrderingUtil.compareFloats`). But the comparison
 functions of arrow-rs used by DataFusion do not normalize NaN and zero (e.g., 
[arrow::compute::kernels::cmp::eq](https://docs.rs/arrow/latest/arrow/compute/kernels/cmp/fn.eq.html#)).
-So Comet will add additional normalization expression of NaN and zero for 
comparison.
-
-Sorting on floating-point data types (or complex types containing 
floating-point values) is not compatible with
-Spark if the data contains both zero and negative zero. This is likely an edge 
case that is not of concern for many users
-and sorting on floating-point data can be enabled by setting 
`spark.comet.expression.SortOrder.allowIncompatible=true`.
+So Comet adds additional normalization expression of NaN and zero for 
comparisons, and may still have differences
+to Spark in some cases, especially when the data contains both positive and 
negative zero. This is likely an edge
+case that is not of concern for many users. If it is a concern, setting 
`spark.comet.exec.strictFloatingPoint=true`
+will make relevant operations fall back to Spark.
 
 ## Incompatible Expressions
 
diff --git a/docs/source/user-guide/latest/configs.md 
b/docs/source/user-guide/latest/configs.md
index 468ac5948..8236574b8 100644
--- a/docs/source/user-guide/latest/configs.md
+++ b/docs/source/user-guide/latest/configs.md
@@ -62,6 +62,7 @@ Comet provides the following configuration settings.
 | `spark.comet.exceptionOnDatetimeRebase` | Whether to throw exception when 
seeing dates/timestamps from the legacy hybrid (Julian + Gregorian) calendar. 
Since Spark 3, dates/timestamps were written according to the Proleptic 
Gregorian calendar. When this is true, Comet will throw exceptions when seeing 
these dates/timestamps that were written by Spark version before 3.0. If this 
is false, these dates/timestamps will be read as if they were written to the 
Proleptic Gregorian calendar and [...]
 | `spark.comet.exec.enabled` | Whether to enable Comet native vectorized 
execution for Spark. This controls whether Spark should convert operators into 
their Comet counterparts and execute them in native space. Note: each operator 
is associated with a separate config in the format of 
`spark.comet.exec.<operator_name>.enabled` at the moment, and both the config 
and this need to be turned on, in order for the operator to be executed in 
native. | true |
 | `spark.comet.exec.replaceSortMergeJoin` | Experimental feature to force 
Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance. 
This feature is not stable yet. For more information, refer to the [Comet 
Tuning Guide](https://datafusion.apache.org/comet/user-guide/tuning.html). | 
false |
+| `spark.comet.exec.strictFloatingPoint` | When enabled, fall back to Spark 
for floating-point operations that differ from Spark, such as when comparing or 
sorting -0.0 and 0.0. For more information, refer to the [Comet Compatibility 
Guide](https://datafusion.apache.org/comet/user-guide/compatibility.html). | 
false |
 | `spark.comet.expression.allowIncompatible` | Comet is not currently fully 
compatible with Spark for all expressions. Set this config to true to allow 
them anyway. For more information, refer to the [Comet Compatibility 
Guide](https://datafusion.apache.org/comet/user-guide/compatibility.html). | 
false |
 | `spark.comet.maxTempDirectorySize` | The maximum amount of data (in bytes) 
stored inside the temporary directories. | 107374182400b |
 | `spark.comet.metrics.updateInterval` | The interval in milliseconds to 
update metrics. If interval is negative, metrics will be updated upon task 
completion. | 3000 |
diff --git a/spark/src/main/scala/org/apache/comet/serde/CometSortOrder.scala 
b/spark/src/main/scala/org/apache/comet/serde/CometSortOrder.scala
index c1c3cd6d1..aabe34f13 100644
--- a/spark/src/main/scala/org/apache/comet/serde/CometSortOrder.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/CometSortOrder.scala
@@ -41,9 +41,14 @@ object CometSortOrder extends 
CometExpressionSerde[SortOrder] {
       }
     }
 
-    if (containsFloatingPoint(expr.child.dataType)) {
-      Incompatible(Some(
-        s"Sorting on floating-point is not 100% compatible with Spark. 
${CometConf.COMPAT_GUIDE}"))
+    if (CometConf.COMET_EXEC_STRICT_FLOATING_POINT.get() &&
+      containsFloatingPoint(expr.child.dataType)) {
+      // https://github.com/apache/datafusion-comet/issues/2626
+      Incompatible(
+        Some(
+          "Sorting on floating-point is not 100% compatible with Spark, and 
Comet is running " +
+            s"with ${CometConf.COMET_EXEC_STRICT_FLOATING_POINT.key}=true. " +
+            s"${CometConf.COMPAT_GUIDE}"))
     } else {
       Compatible()
     }
diff --git a/spark/src/main/scala/org/apache/comet/serde/aggregates.scala 
b/spark/src/main/scala/org/apache/comet/serde/aggregates.scala
index 4b8a74c15..48344e061 100644
--- a/spark/src/main/scala/org/apache/comet/serde/aggregates.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/aggregates.scala
@@ -24,9 +24,10 @@ import scala.jdk.CollectionConverters._
 import org.apache.spark.sql.catalyst.expressions.{Attribute, EvalMode}
 import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, 
Average, BitAndAgg, BitOrAgg, BitXorAgg, BloomFilterAggregate, 
CentralMomentAgg, Corr, Count, Covariance, CovPopulation, CovSample, First, 
Last, Max, Min, StddevPop, StddevSamp, Sum, VariancePop, VarianceSamp}
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{ByteType, DecimalType, IntegerType, 
LongType, ShortType, StringType}
+import org.apache.spark.sql.types.{ByteType, DataTypes, DecimalType, 
IntegerType, LongType, ShortType, StringType}
 
 import org.apache.comet.CometConf
+import org.apache.comet.CometConf.COMET_EXEC_STRICT_FLOATING_POINT
 import org.apache.comet.CometSparkSessionExtensions.withInfo
 import org.apache.comet.serde.QueryPlanSerde.{exprToProto, serializeDataType}
 
@@ -42,6 +43,17 @@ object CometMin extends CometAggregateExpressionSerde[Min] {
       withInfo(aggExpr, s"Unsupported data type: ${expr.dataType}")
       return None
     }
+
+    if (expr.dataType == DataTypes.FloatType || expr.dataType == 
DataTypes.DoubleType) {
+      if (CometConf.COMET_EXEC_STRICT_FLOATING_POINT.get()) {
+        // https://github.com/apache/datafusion-comet/issues/2448
+        withInfo(
+          aggExpr,
+          s"floating-point not supported when 
${COMET_EXEC_STRICT_FLOATING_POINT.key}=true")
+        return None
+      }
+    }
+
     val child = expr.children.head
     val childExpr = exprToProto(child, inputs, binding)
     val dataType = serializeDataType(expr.dataType)
@@ -78,6 +90,17 @@ object CometMax extends CometAggregateExpressionSerde[Max] {
       withInfo(aggExpr, s"Unsupported data type: ${expr.dataType}")
       return None
     }
+
+    if (expr.dataType == DataTypes.FloatType || expr.dataType == 
DataTypes.DoubleType) {
+      if (CometConf.COMET_EXEC_STRICT_FLOATING_POINT.get()) {
+        // https://github.com/apache/datafusion-comet/issues/2448
+        withInfo(
+          aggExpr,
+          s"floating-point not supported when 
${COMET_EXEC_STRICT_FLOATING_POINT.key}=true")
+        return None
+      }
+    }
+
     val child = expr.children.head
     val childExpr = exprToProto(child, inputs, binding)
     val dataType = serializeDataType(expr.dataType)
diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala 
b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
index b1e437d5b..8b31cc028 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
@@ -1044,6 +1044,15 @@ trait CometBaseAggregate {
 
       val aggExprs =
         aggregateExpressions.map(aggExprToProto(_, output, binding, 
aggregate.conf))
+
+      if (aggExprs.exists(_.isEmpty)) {
+        withInfo(
+          aggregate,
+          "Unsupported aggregate expression(s)",
+          aggregateExpressions ++ 
aggregateExpressions.map(_.aggregateFunction): _*)
+        return None
+      }
+
       if (childOp.nonEmpty && groupingExprs.forall(_.isDefined) &&
         aggExprs.forall(_.isDefined)) {
         val hashAggBuilder = OperatorOuterClass.HashAggregate.newBuilder()
diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
index fae6bb535..7d2f03513 100644
--- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
@@ -72,7 +72,9 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
       DataGenOptions(generateNegativeZero = true))
     df.createOrReplaceTempView("tbl")
 
-    withSQLConf(CometConf.getExprAllowIncompatConfigKey("SortOrder") -> 
"false") {
+    withSQLConf(
+      CometConf.getExprAllowIncompatConfigKey("SortOrder") -> "false",
+      CometConf.COMET_EXEC_STRICT_FLOATING_POINT.key -> "true") {
       checkSparkAnswerAndFallbackReasons(
         "select * from tbl order by 1, 2",
         Set(
@@ -94,7 +96,9 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
       DataGenOptions(generateNegativeZero = true))
     df.createOrReplaceTempView("tbl")
 
-    withSQLConf(CometConf.getExprAllowIncompatConfigKey("SortOrder") -> 
"false") {
+    withSQLConf(
+      CometConf.getExprAllowIncompatConfigKey("SortOrder") -> "false",
+      CometConf.COMET_EXEC_STRICT_FLOATING_POINT.key -> "true") {
       checkSparkAnswerAndFallbackReason(
         "select * from tbl order by 1, 2",
         "unsupported range partitioning sort order")
@@ -118,7 +122,9 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
       DataGenOptions(generateNegativeZero = true))
     df.createOrReplaceTempView("tbl")
 
-    withSQLConf(CometConf.getExprAllowIncompatConfigKey("SortOrder") -> 
"false") {
+    withSQLConf(
+      CometConf.getExprAllowIncompatConfigKey("SortOrder") -> "false",
+      CometConf.COMET_EXEC_STRICT_FLOATING_POINT.key -> "true") {
       checkSparkAnswerAndFallbackReason(
         "select * from tbl order by 1, 2",
         "unsupported range partitioning sort order")
diff --git 
a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
index 5fdd24c46..1a61a0ddb 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
@@ -28,9 +28,11 @@ import org.apache.spark.sql.comet.CometHashAggregateExec
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import org.apache.spark.sql.functions.{avg, count_distinct, sum}
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
 
 import org.apache.comet.CometConf
-import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, 
SchemaGenOptions}
+import org.apache.comet.CometConf.COMET_EXEC_STRICT_FLOATING_POINT
+import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator, 
ParquetGenerator, SchemaGenOptions}
 
 /**
  * Test suite dedicated to Comet native aggregate operator
@@ -38,6 +40,39 @@ import org.apache.comet.testing.{DataGenOptions, 
ParquetGenerator, SchemaGenOpti
 class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
   import testImplicits._
 
+  test("min/max floating point with negative zero") {
+    val r = new Random(42)
+    val schema = StructType(
+      Seq(
+        StructField("float_col", DataTypes.FloatType, nullable = true),
+        StructField("double_col", DataTypes.DoubleType, nullable = true)))
+    val df = FuzzDataGenerator.generateDataFrame(
+      r,
+      spark,
+      schema,
+      1000,
+      DataGenOptions(generateNegativeZero = true))
+    df.createOrReplaceTempView("tbl")
+
+    for (col <- Seq("float_col", "double_col")) {
+      // assert that data contains positive and negative zero
+      assert(spark.sql(s"select * from tbl where cast($col as string) = 
'0.0'").count() > 0)
+      assert(spark.sql(s"select * from tbl where cast($col as string) = 
'-0.0'").count() > 0)
+      for (agg <- Seq("min", "max")) {
+        withSQLConf(COMET_EXEC_STRICT_FLOATING_POINT.key -> "true") {
+          checkSparkAnswerAndFallbackReasons(
+            s"select $agg($col) from tbl where cast($col as string) in ('0.0', 
'-0.0')",
+            Set(
+              "Unsupported aggregate expression(s)",
+              s"floating-point not supported when 
${COMET_EXEC_STRICT_FLOATING_POINT.key}=true"))
+        }
+        checkSparkAnswer(
+          s"select $col, count(*) from tbl " +
+            s"where cast($col as string) in ('0.0', '-0.0') group by $col")
+      }
+    }
+  }
+
   test("avg decimal") {
     withTempDir { dir =>
       val path = new Path(dir.toURI.toString, "test.parquet")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to