This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 52e6b3402 fix: Add strict floating point mode and fallback to Spark
for min/max/sort on floating point inputs when enabled (#2747)
52e6b3402 is described below
commit 52e6b34021b4ee803024eb3e57a35cb00315f10b
Author: Andy Grove <[email protected]>
AuthorDate: Fri Nov 14 10:34:00 2025 -0700
fix: Add strict floating point mode and fallback to Spark for min/max/sort
on floating point inputs when enabled (#2747)
---
.../main/scala/org/apache/comet/CometConf.scala | 9 ++++++
docs/source/user-guide/latest/compatibility.md | 9 +++---
docs/source/user-guide/latest/configs.md | 1 +
.../org/apache/comet/serde/CometSortOrder.scala | 11 +++++--
.../scala/org/apache/comet/serde/aggregates.scala | 25 ++++++++++++++-
.../org/apache/spark/sql/comet/operators.scala | 9 ++++++
.../org/apache/comet/CometExpressionSuite.scala | 12 +++++--
.../apache/comet/exec/CometAggregateSuite.scala | 37 +++++++++++++++++++++-
8 files changed, 100 insertions(+), 13 deletions(-)
diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala
b/common/src/main/scala/org/apache/comet/CometConf.scala
index 5183ca000..53ae060c0 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -674,6 +674,15 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)
+ val COMET_EXEC_STRICT_FLOATING_POINT: ConfigEntry[Boolean] =
+ conf("spark.comet.exec.strictFloatingPoint")
+ .category(CATEGORY_EXEC)
+ .doc(
+ "When enabled, fall back to Spark for floating-point operations that
may differ from " +
+ s"Spark, such as when comparing or sorting -0.0 and 0.0.
$COMPAT_GUIDE.")
+ .booleanConf
+ .createWithDefault(false)
+
val COMET_REGEXP_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] =
conf("spark.comet.regexp.allowIncompatible")
.category(CATEGORY_EXEC)
diff --git a/docs/source/user-guide/latest/compatibility.md
b/docs/source/user-guide/latest/compatibility.md
index 6042b4240..c67536c4e 100644
--- a/docs/source/user-guide/latest/compatibility.md
+++ b/docs/source/user-guide/latest/compatibility.md
@@ -47,11 +47,10 @@ Spark normalizes NaN and zero for floating point numbers
for several cases. See
However, one exception is comparison. Spark does not normalize NaN and zero
when comparing values
because they are handled well in Spark (e.g.,
`SQLOrderingUtil.compareFloats`). But the comparison
functions of arrow-rs used by DataFusion do not normalize NaN and zero (e.g.,
[arrow::compute::kernels::cmp::eq](https://docs.rs/arrow/latest/arrow/compute/kernels/cmp/fn.eq.html#)).
-So Comet will add additional normalization expression of NaN and zero for
comparison.
-
-Sorting on floating-point data types (or complex types containing
floating-point values) is not compatible with
-Spark if the data contains both zero and negative zero. This is likely an edge
case that is not of concern for many users
-and sorting on floating-point data can be enabled by setting
`spark.comet.expression.SortOrder.allowIncompatible=true`.
+So Comet adds additional normalization expression of NaN and zero for
comparisons, and may still have differences
+to Spark in some cases, especially when the data contains both positive and
negative zero. This is likely an edge
+case that is not of concern for many users. If it is a concern, setting
`spark.comet.exec.strictFloatingPoint=true`
+will make relevant operations fall back to Spark.
## Incompatible Expressions
diff --git a/docs/source/user-guide/latest/configs.md
b/docs/source/user-guide/latest/configs.md
index 468ac5948..8236574b8 100644
--- a/docs/source/user-guide/latest/configs.md
+++ b/docs/source/user-guide/latest/configs.md
@@ -62,6 +62,7 @@ Comet provides the following configuration settings.
| `spark.comet.exceptionOnDatetimeRebase` | Whether to throw exception when
seeing dates/timestamps from the legacy hybrid (Julian + Gregorian) calendar.
Since Spark 3, dates/timestamps were written according to the Proleptic
Gregorian calendar. When this is true, Comet will throw exceptions when seeing
these dates/timestamps that were written by Spark version before 3.0. If this
is false, these dates/timestamps will be read as if they were written to the
Proleptic Gregorian calendar and [...]
| `spark.comet.exec.enabled` | Whether to enable Comet native vectorized
execution for Spark. This controls whether Spark should convert operators into
their Comet counterparts and execute them in native space. Note: each operator
is associated with a separate config in the format of
`spark.comet.exec.<operator_name>.enabled` at the moment, and both the config
and this need to be turned on, in order for the operator to be executed in
native. | true |
| `spark.comet.exec.replaceSortMergeJoin` | Experimental feature to force
Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance.
This feature is not stable yet. For more information, refer to the [Comet
Tuning Guide](https://datafusion.apache.org/comet/user-guide/tuning.html). |
false |
+| `spark.comet.exec.strictFloatingPoint` | When enabled, fall back to Spark
for floating-point operations that differ from Spark, such as when comparing or
sorting -0.0 and 0.0. For more information, refer to the [Comet Compatibility
Guide](https://datafusion.apache.org/comet/user-guide/compatibility.html). |
false |
| `spark.comet.expression.allowIncompatible` | Comet is not currently fully
compatible with Spark for all expressions. Set this config to true to allow
them anyway. For more information, refer to the [Comet Compatibility
Guide](https://datafusion.apache.org/comet/user-guide/compatibility.html). |
false |
| `spark.comet.maxTempDirectorySize` | The maximum amount of data (in bytes)
stored inside the temporary directories. | 107374182400b |
| `spark.comet.metrics.updateInterval` | The interval in milliseconds to
update metrics. If interval is negative, metrics will be updated upon task
completion. | 3000 |
diff --git a/spark/src/main/scala/org/apache/comet/serde/CometSortOrder.scala
b/spark/src/main/scala/org/apache/comet/serde/CometSortOrder.scala
index c1c3cd6d1..aabe34f13 100644
--- a/spark/src/main/scala/org/apache/comet/serde/CometSortOrder.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/CometSortOrder.scala
@@ -41,9 +41,14 @@ object CometSortOrder extends
CometExpressionSerde[SortOrder] {
}
}
- if (containsFloatingPoint(expr.child.dataType)) {
- Incompatible(Some(
- s"Sorting on floating-point is not 100% compatible with Spark.
${CometConf.COMPAT_GUIDE}"))
+ if (CometConf.COMET_EXEC_STRICT_FLOATING_POINT.get() &&
+ containsFloatingPoint(expr.child.dataType)) {
+ // https://github.com/apache/datafusion-comet/issues/2626
+ Incompatible(
+ Some(
+ "Sorting on floating-point is not 100% compatible with Spark, and
Comet is running " +
+ s"with ${CometConf.COMET_EXEC_STRICT_FLOATING_POINT.key}=true. " +
+ s"${CometConf.COMPAT_GUIDE}"))
} else {
Compatible()
}
diff --git a/spark/src/main/scala/org/apache/comet/serde/aggregates.scala
b/spark/src/main/scala/org/apache/comet/serde/aggregates.scala
index 4b8a74c15..48344e061 100644
--- a/spark/src/main/scala/org/apache/comet/serde/aggregates.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/aggregates.scala
@@ -24,9 +24,10 @@ import scala.jdk.CollectionConverters._
import org.apache.spark.sql.catalyst.expressions.{Attribute, EvalMode}
import
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression,
Average, BitAndAgg, BitOrAgg, BitXorAgg, BloomFilterAggregate,
CentralMomentAgg, Corr, Count, Covariance, CovPopulation, CovSample, First,
Last, Max, Min, StddevPop, StddevSamp, Sum, VariancePop, VarianceSamp}
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{ByteType, DecimalType, IntegerType,
LongType, ShortType, StringType}
+import org.apache.spark.sql.types.{ByteType, DataTypes, DecimalType,
IntegerType, LongType, ShortType, StringType}
import org.apache.comet.CometConf
+import org.apache.comet.CometConf.COMET_EXEC_STRICT_FLOATING_POINT
import org.apache.comet.CometSparkSessionExtensions.withInfo
import org.apache.comet.serde.QueryPlanSerde.{exprToProto, serializeDataType}
@@ -42,6 +43,17 @@ object CometMin extends CometAggregateExpressionSerde[Min] {
withInfo(aggExpr, s"Unsupported data type: ${expr.dataType}")
return None
}
+
+ if (expr.dataType == DataTypes.FloatType || expr.dataType ==
DataTypes.DoubleType) {
+ if (CometConf.COMET_EXEC_STRICT_FLOATING_POINT.get()) {
+ // https://github.com/apache/datafusion-comet/issues/2448
+ withInfo(
+ aggExpr,
+ s"floating-point not supported when
${COMET_EXEC_STRICT_FLOATING_POINT.key}=true")
+ return None
+ }
+ }
+
val child = expr.children.head
val childExpr = exprToProto(child, inputs, binding)
val dataType = serializeDataType(expr.dataType)
@@ -78,6 +90,17 @@ object CometMax extends CometAggregateExpressionSerde[Max] {
withInfo(aggExpr, s"Unsupported data type: ${expr.dataType}")
return None
}
+
+ if (expr.dataType == DataTypes.FloatType || expr.dataType ==
DataTypes.DoubleType) {
+ if (CometConf.COMET_EXEC_STRICT_FLOATING_POINT.get()) {
+ // https://github.com/apache/datafusion-comet/issues/2448
+ withInfo(
+ aggExpr,
+ s"floating-point not supported when
${COMET_EXEC_STRICT_FLOATING_POINT.key}=true")
+ return None
+ }
+ }
+
val child = expr.children.head
val childExpr = exprToProto(child, inputs, binding)
val dataType = serializeDataType(expr.dataType)
diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
index b1e437d5b..8b31cc028 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
@@ -1044,6 +1044,15 @@ trait CometBaseAggregate {
val aggExprs =
aggregateExpressions.map(aggExprToProto(_, output, binding,
aggregate.conf))
+
+ if (aggExprs.exists(_.isEmpty)) {
+ withInfo(
+ aggregate,
+ "Unsupported aggregate expression(s)",
+ aggregateExpressions ++
aggregateExpressions.map(_.aggregateFunction): _*)
+ return None
+ }
+
if (childOp.nonEmpty && groupingExprs.forall(_.isDefined) &&
aggExprs.forall(_.isDefined)) {
val hashAggBuilder = OperatorOuterClass.HashAggregate.newBuilder()
diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
index fae6bb535..7d2f03513 100644
--- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
@@ -72,7 +72,9 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
DataGenOptions(generateNegativeZero = true))
df.createOrReplaceTempView("tbl")
- withSQLConf(CometConf.getExprAllowIncompatConfigKey("SortOrder") ->
"false") {
+ withSQLConf(
+ CometConf.getExprAllowIncompatConfigKey("SortOrder") -> "false",
+ CometConf.COMET_EXEC_STRICT_FLOATING_POINT.key -> "true") {
checkSparkAnswerAndFallbackReasons(
"select * from tbl order by 1, 2",
Set(
@@ -94,7 +96,9 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
DataGenOptions(generateNegativeZero = true))
df.createOrReplaceTempView("tbl")
- withSQLConf(CometConf.getExprAllowIncompatConfigKey("SortOrder") ->
"false") {
+ withSQLConf(
+ CometConf.getExprAllowIncompatConfigKey("SortOrder") -> "false",
+ CometConf.COMET_EXEC_STRICT_FLOATING_POINT.key -> "true") {
checkSparkAnswerAndFallbackReason(
"select * from tbl order by 1, 2",
"unsupported range partitioning sort order")
@@ -118,7 +122,9 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
DataGenOptions(generateNegativeZero = true))
df.createOrReplaceTempView("tbl")
- withSQLConf(CometConf.getExprAllowIncompatConfigKey("SortOrder") ->
"false") {
+ withSQLConf(
+ CometConf.getExprAllowIncompatConfigKey("SortOrder") -> "false",
+ CometConf.COMET_EXEC_STRICT_FLOATING_POINT.key -> "true") {
checkSparkAnswerAndFallbackReason(
"select * from tbl order by 1, 2",
"unsupported range partitioning sort order")
diff --git
a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
index 5fdd24c46..1a61a0ddb 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
@@ -28,9 +28,11 @@ import org.apache.spark.sql.comet.CometHashAggregateExec
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.functions.{avg, count_distinct, sum}
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.comet.CometConf
-import org.apache.comet.testing.{DataGenOptions, ParquetGenerator,
SchemaGenOptions}
+import org.apache.comet.CometConf.COMET_EXEC_STRICT_FLOATING_POINT
+import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator,
ParquetGenerator, SchemaGenOptions}
/**
* Test suite dedicated to Comet native aggregate operator
@@ -38,6 +40,39 @@ import org.apache.comet.testing.{DataGenOptions,
ParquetGenerator, SchemaGenOpti
class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
import testImplicits._
+ test("min/max floating point with negative zero") {
+ val r = new Random(42)
+ val schema = StructType(
+ Seq(
+ StructField("float_col", DataTypes.FloatType, nullable = true),
+ StructField("double_col", DataTypes.DoubleType, nullable = true)))
+ val df = FuzzDataGenerator.generateDataFrame(
+ r,
+ spark,
+ schema,
+ 1000,
+ DataGenOptions(generateNegativeZero = true))
+ df.createOrReplaceTempView("tbl")
+
+ for (col <- Seq("float_col", "double_col")) {
+ // assert that data contains positive and negative zero
+ assert(spark.sql(s"select * from tbl where cast($col as string) =
'0.0'").count() > 0)
+ assert(spark.sql(s"select * from tbl where cast($col as string) =
'-0.0'").count() > 0)
+ for (agg <- Seq("min", "max")) {
+ withSQLConf(COMET_EXEC_STRICT_FLOATING_POINT.key -> "true") {
+ checkSparkAnswerAndFallbackReasons(
+ s"select $agg($col) from tbl where cast($col as string) in ('0.0',
'-0.0')",
+ Set(
+ "Unsupported aggregate expression(s)",
+ s"floating-point not supported when
${COMET_EXEC_STRICT_FLOATING_POINT.key}=true"))
+ }
+ checkSparkAnswer(
+ s"select $col, count(*) from tbl " +
+ s"where cast($col as string) in ('0.0', '-0.0') group by $col")
+ }
+ }
+ }
+
test("avg decimal") {
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "test.parquet")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]