This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new f49e4b6f7 chore: Mark expressions with known correctness issues as
incompatible (#3675)
f49e4b6f7 is described below
commit f49e4b6f7cca9ce40f5cd5a2dfa2d23511170cd1
Author: Andy Grove <[email protected]>
AuthorDate: Thu Mar 12 16:54:03 2026 -0600
chore: Mark expressions with known correctness issues as incompatible
(#3675)
* fix: mark expressions with known correctness issues as incompatible
Review all open correctness issues and mark affected expressions as
Incompatible so they fall back to Spark by default. Update the
compatibility guide with detailed documentation of each incompatibility
and links to tracking issues.
Expressions marked Incompatible:
- ArrayContains (#3346), GetArrayItem (#3330, #3332), ArrayRemove (#3173)
- Hour, Minute, Second for TimestampNTZ inputs (#3180)
- TruncTimestamp for non-UTC timezones (#2649)
- Ceil, Floor for Decimal inputs (#1729)
- Tan (#1897), Corr (#2646), StructsToJson (#3016)
* fix: reformat expressions.md with prettier
* fix: enable allowIncompatible in tests for newly incompatible expressions
* fix: apply spotless formatting to updated test files
* fix: enable ArrayContains allowIncompatible for map_contains_key test
map_contains_key is internally rewritten by Spark to use ArrayContains,
so it needs the allowIncompatible config to run natively.
* fix: remove unused GetArrayItem import from CometArrayExpressionSuite
* fix: enable Corr allowIncompatible for covariance & correlation test
---
docs/source/user-guide/latest/compatibility.md | 45 ++++
docs/source/user-guide/latest/expressions.md | 196 ++++++++--------
.../org/apache/comet/serde/QueryPlanSerde.scala | 2 +-
.../scala/org/apache/comet/serde/aggregates.scala | 7 +
.../main/scala/org/apache/comet/serde/arrays.scala | 21 ++
.../scala/org/apache/comet/serde/datetime.scala | 47 +++-
.../main/scala/org/apache/comet/serde/math.scala | 44 +++-
.../scala/org/apache/comet/serde/structs.scala | 6 +
.../sql-tests/expressions/aggregate/corr.sql | 1 +
.../sql-tests/expressions/array/array_remove.sql | 1 +
.../expressions/datetime/trunc_timestamp.sql | 1 +
.../sql-tests/expressions/map/map_contains_key.sql | 1 +
.../resources/sql-tests/expressions/math/ceil.sql | 1 +
.../resources/sql-tests/expressions/math/floor.sql | 1 +
.../apache/comet/CometArrayExpressionSuite.scala | 177 +++++++-------
.../org/apache/comet/CometExpressionSuite.scala | 253 +++++++++++----------
.../apache/comet/exec/CometAggregateSuite.scala | 5 +-
.../apache/comet/exec/CometNativeReaderSuite.scala | 5 +-
.../apache/comet/parquet/ParquetReadSuite.scala | 5 +-
19 files changed, 515 insertions(+), 304 deletions(-)
diff --git a/docs/source/user-guide/latest/compatibility.md
b/docs/source/user-guide/latest/compatibility.md
index 21695bdf5..e0bc5f06e 100644
--- a/docs/source/user-guide/latest/compatibility.md
+++ b/docs/source/user-guide/latest/compatibility.md
@@ -58,6 +58,51 @@ Expressions that are not 100% Spark-compatible will fall
back to Spark by defaul
`spark.comet.expression.EXPRNAME.allowIncompatible=true`, where `EXPRNAME` is
the Spark expression class name. See
the [Comet Supported Expressions Guide](expressions.md) for more information
on this configuration setting.
+### Array Expressions
+
+- **ArrayContains**: Returns null instead of false for empty arrays with
literal values.
+ [#3346](https://github.com/apache/datafusion-comet/issues/3346)
+- **ArrayRemove**: Returns null when the element to remove is null, instead of
removing null elements from the array.
+ [#3173](https://github.com/apache/datafusion-comet/issues/3173)
+- **GetArrayItem**: Known correctness issues with index handling, including
off-by-one errors and incorrect results
+ with dynamic (non-literal) index values.
+ [#3330](https://github.com/apache/datafusion-comet/issues/3330),
+ [#3332](https://github.com/apache/datafusion-comet/issues/3332)
+- **ArraysOverlap**: Inconsistent behavior when arrays contain NULL values.
+ [#3645](https://github.com/apache/datafusion-comet/issues/3645),
+ [#2036](https://github.com/apache/datafusion-comet/issues/2036)
+- **ArrayUnion**: Sorts input arrays before performing the union, while Spark
preserves the order of the first array
+ and appends unique elements from the second.
+ [#3644](https://github.com/apache/datafusion-comet/issues/3644)
+
+### Date/Time Expressions
+
+- **Hour, Minute, Second**: Incorrectly apply timezone conversion to
TimestampNTZ inputs. TimestampNTZ stores local
+ time without timezone, so no conversion should be applied. These expressions
work correctly with Timestamp inputs.
+ [#3180](https://github.com/apache/datafusion-comet/issues/3180)
+- **TruncTimestamp (date_trunc)**: Produces incorrect results when used with
non-UTC timezones. Compatible when
+ timezone is UTC.
+ [#2649](https://github.com/apache/datafusion-comet/issues/2649)
+
+### Math Expressions
+
+- **Ceil, Floor**: Incorrect results for Decimal type inputs.
+ [#1729](https://github.com/apache/datafusion-comet/issues/1729)
+- **Tan**: `tan(-0.0)` produces `0.0` instead of `-0.0`.
+ [#1897](https://github.com/apache/datafusion-comet/issues/1897)
+
+### Aggregate Expressions
+
+- **Corr**: Returns null instead of NaN in some edge cases.
+ [#2646](https://github.com/apache/datafusion-comet/issues/2646)
+- **First, Last**: These functions are not deterministic. When `ignoreNulls`
is set, results may not match Spark.
+ [#1630](https://github.com/apache/datafusion-comet/issues/1630)
+
+### Struct Expressions
+
+- **StructsToJson (to_json)**: Does not support `+Infinity` and `-Infinity`
for numeric types (float, double).
+ [#3016](https://github.com/apache/datafusion-comet/issues/3016)
+
## Regular Expressions
Comet uses the Rust regexp crate for evaluating regular expressions, and this
has different behavior from Java's
diff --git a/docs/source/user-guide/latest/expressions.md
b/docs/source/user-guide/latest/expressions.md
index 0339cd2a3..57b7a3455 100644
--- a/docs/source/user-guide/latest/expressions.md
+++ b/docs/source/user-guide/latest/expressions.md
@@ -92,76 +92,76 @@ Expressions that are not Spark-compatible will fall back to
Spark by default and
## Date/Time Functions
-| Expression | SQL | Spark-Compatible? |
Compatibility Notes
|
-| -------------- | ---------------------------- | ----------------- |
--------------------------------------------------------------------------------------------------------------------
|
-| DateAdd | `date_add` | Yes |
|
-| DateDiff | `datediff` | Yes |
|
-| DateFormat | `date_format` | Yes | Partial
support. Only specific format patterns are supported.
|
-| DateSub | `date_sub` | Yes |
|
-| DatePart | `date_part(field, source)` | Yes |
Supported values of `field`:
`year`/`month`/`week`/`day`/`dayofweek`/`dayofweek_iso`/`doy`/`quarter`/`hour`/`minute`
|
-| Extract | `extract(field FROM source)` | Yes |
Supported values of `field`:
`year`/`month`/`week`/`day`/`dayofweek`/`dayofweek_iso`/`doy`/`quarter`/`hour`/`minute`
|
-| FromUnixTime | `from_unixtime` | No | Does not
support format, supports only -8334601211038 <= sec <= 8210266876799
|
-| Hour | `hour` | Yes |
|
-| LastDay | `last_day` | Yes |
|
-| Minute | `minute` | Yes |
|
-| Second | `second` | Yes |
|
-| TruncDate | `trunc` | Yes |
|
-| TruncTimestamp | `date_trunc` | Yes |
|
-| UnixDate | `unix_date` | Yes |
|
-| UnixTimestamp | `unix_timestamp` | Yes |
|
-| Year | `year` | Yes |
|
-| Month | `month` | Yes |
|
-| DayOfMonth | `day`/`dayofmonth` | Yes |
|
-| DayOfWeek | `dayofweek` | Yes |
|
-| WeekDay | `weekday` | Yes |
|
-| DayOfYear | `dayofyear` | Yes |
|
-| WeekOfYear | `weekofyear` | Yes |
|
-| Quarter | `quarter` | Yes |
|
+| Expression | SQL | Spark-Compatible? |
Compatibility Notes
|
+| -------------- | ---------------------------- | ----------------- |
--------------------------------------------------------------------------------------------------------------------------------
|
+| DateAdd | `date_add` | Yes |
|
+| DateDiff | `datediff` | Yes |
|
+| DateFormat | `date_format` | Yes | Partial
support. Only specific format patterns are supported.
|
+| DateSub | `date_sub` | Yes |
|
+| DatePart | `date_part(field, source)` | Yes |
Supported values of `field`:
`year`/`month`/`week`/`day`/`dayofweek`/`dayofweek_iso`/`doy`/`quarter`/`hour`/`minute`
|
+| Extract | `extract(field FROM source)` | Yes |
Supported values of `field`:
`year`/`month`/`week`/`day`/`dayofweek`/`dayofweek_iso`/`doy`/`quarter`/`hour`/`minute`
|
+| FromUnixTime | `from_unixtime` | No | Does not
support format, supports only -8334601211038 <= sec <= 8210266876799
|
+| Hour | `hour` | No |
Incorrectly applies timezone conversion to TimestampNTZ inputs
([#3180](https://github.com/apache/datafusion-comet/issues/3180)) |
+| LastDay | `last_day` | Yes |
|
+| Minute | `minute` | No |
Incorrectly applies timezone conversion to TimestampNTZ inputs
([#3180](https://github.com/apache/datafusion-comet/issues/3180)) |
+| Second | `second` | No |
Incorrectly applies timezone conversion to TimestampNTZ inputs
([#3180](https://github.com/apache/datafusion-comet/issues/3180)) |
+| TruncDate | `trunc` | Yes |
|
+| TruncTimestamp | `date_trunc` | No |
Incorrect results in non-UTC timezones
([#2649](https://github.com/apache/datafusion-comet/issues/2649))
|
+| UnixDate | `unix_date` | Yes |
|
+| UnixTimestamp | `unix_timestamp` | Yes |
|
+| Year | `year` | Yes |
|
+| Month | `month` | Yes |
|
+| DayOfMonth | `day`/`dayofmonth` | Yes |
|
+| DayOfWeek | `dayofweek` | Yes |
|
+| WeekDay | `weekday` | Yes |
|
+| DayOfYear | `dayofyear` | Yes |
|
+| WeekOfYear | `weekofyear` | Yes |
|
+| Quarter | `quarter` | Yes |
|
## Math Expressions
-| Expression | SQL | Spark-Compatible? | Compatibility Notes
|
-| -------------- | --------- | ----------------- |
--------------------------------- |
-| Abs | `abs` | Yes |
|
-| Acos | `acos` | Yes |
|
-| Add | `+` | Yes |
|
-| Asin | `asin` | Yes |
|
-| Atan | `atan` | Yes |
|
-| Atan2 | `atan2` | Yes |
|
-| BRound | `bround` | Yes |
|
-| Ceil | `ceil` | Yes |
|
-| Cos | `cos` | Yes |
|
-| Cosh | `cosh` | Yes |
|
-| Cot | `cot` | Yes |
|
-| Divide | `/` | Yes |
|
-| Exp | `exp` | Yes |
|
-| Expm1 | `expm1` | Yes |
|
-| Floor | `floor` | Yes |
|
-| Hex | `hex` | Yes |
|
-| IntegralDivide | `div` | Yes |
|
-| IsNaN | `isnan` | Yes |
|
-| Log | `log` | Yes |
|
-| Log2 | `log2` | Yes |
|
-| Log10 | `log10` | Yes |
|
-| Multiply | `*` | Yes |
|
-| Pow | `power` | Yes |
|
-| Rand | `rand` | Yes |
|
-| Randn | `randn` | Yes |
|
-| Remainder | `%` | Yes |
|
-| Round | `round` | Yes |
|
-| Signum | `signum` | Yes |
|
-| Sin | `sin` | Yes |
|
-| Sinh | `sinh` | Yes |
|
-| Sqrt | `sqrt` | Yes |
|
-| Subtract | `-` | Yes |
|
-| Tan | `tan` | Yes |
|
-| Tanh | `tanh` | Yes |
|
-| TryAdd | `try_add` | Yes | Only integer inputs are
supported |
-| TryDivide | `try_div` | Yes | Only integer inputs are
supported |
-| TryMultiply | `try_mul` | Yes | Only integer inputs are
supported |
-| TrySubtract | `try_sub` | Yes | Only integer inputs are
supported |
-| UnaryMinus | `-` | Yes |
|
-| Unhex | `unhex` | Yes |
|
+| Expression | SQL | Spark-Compatible? | Compatibility Notes
|
+| -------------- | --------- | ----------------- |
-----------------------------------------------------------------------------------------------------------
|
+| Abs | `abs` | Yes |
|
+| Acos | `acos` | Yes |
|
+| Add | `+` | Yes |
|
+| Asin | `asin` | Yes |
|
+| Atan | `atan` | Yes |
|
+| Atan2 | `atan2` | Yes |
|
+| BRound | `bround` | Yes |
|
+| Ceil | `ceil` | No | Incorrect results for
Decimal type inputs
([#1729](https://github.com/apache/datafusion-comet/issues/1729)) |
+| Cos | `cos` | Yes |
|
+| Cosh | `cosh` | Yes |
|
+| Cot | `cot` | Yes |
|
+| Divide | `/` | Yes |
|
+| Exp | `exp` | Yes |
|
+| Expm1 | `expm1` | Yes |
|
+| Floor | `floor` | No | Incorrect results for
Decimal type inputs
([#1729](https://github.com/apache/datafusion-comet/issues/1729)) |
+| Hex | `hex` | Yes |
|
+| IntegralDivide | `div` | Yes |
|
+| IsNaN | `isnan` | Yes |
|
+| Log | `log` | Yes |
|
+| Log2 | `log2` | Yes |
|
+| Log10 | `log10` | Yes |
|
+| Multiply | `*` | Yes |
|
+| Pow | `power` | Yes |
|
+| Rand | `rand` | Yes |
|
+| Randn | `randn` | Yes |
|
+| Remainder | `%` | Yes |
|
+| Round | `round` | Yes |
|
+| Signum | `signum` | Yes |
|
+| Sin | `sin` | Yes |
|
+| Sinh | `sinh` | Yes |
|
+| Sqrt | `sqrt` | Yes |
|
+| Subtract | `-` | Yes |
|
+| Tan | `tan` | No | tan(-0.0) produces
incorrect result
([#1897](https://github.com/apache/datafusion-comet/issues/1897)) |
+| Tanh | `tanh` | Yes |
|
+| TryAdd | `try_add` | Yes | Only integer inputs are
supported
|
+| TryDivide | `try_div` | Yes | Only integer inputs are
supported
|
+| TryMultiply | `try_mul` | Yes | Only integer inputs are
supported
|
+| TrySubtract | `try_sub` | Yes | Only integer inputs are
supported
|
+| UnaryMinus | `-` | Yes |
|
+| Unhex | `unhex` | Yes |
|
## Hashing Functions
@@ -188,27 +188,27 @@ Expressions that are not Spark-compatible will fall back
to Spark by default and
## Aggregate Expressions
-| Expression | SQL | Spark-Compatible? | Compatibility Notes
|
-| ------------- | ---------- | ------------------------- |
---------------------------------------------------------------- |
-| Average | | Yes, except for ANSI mode |
|
-| BitAndAgg | | Yes |
|
-| BitOrAgg | | Yes |
|
-| BitXorAgg | | Yes |
|
-| BoolAnd | `bool_and` | Yes |
|
-| BoolOr | `bool_or` | Yes |
|
-| Corr | | Yes |
|
-| Count | | Yes |
|
-| CovPopulation | | Yes |
|
-| CovSample | | Yes |
|
-| First | | No | This function is
not deterministic. Results may not match Spark. |
-| Last | | No | This function is
not deterministic. Results may not match Spark. |
-| Max | | Yes |
|
-| Min | | Yes |
|
-| StddevPop | | Yes |
|
-| StddevSamp | | Yes |
|
-| Sum | | Yes, except for ANSI mode |
|
-| VariancePop | | Yes |
|
-| VarianceSamp | | Yes |
|
+| Expression | SQL | Spark-Compatible? | Compatibility Notes
|
+| ------------- | ---------- | ------------------------- |
----------------------------------------------------------------------------------------------------------------
|
+| Average | | Yes, except for ANSI mode |
|
+| BitAndAgg | | Yes |
|
+| BitOrAgg | | Yes |
|
+| BitXorAgg | | Yes |
|
+| BoolAnd | `bool_and` | Yes |
|
+| BoolOr | `bool_or` | Yes |
|
+| Corr | | No | Returns null
instead of NaN in some edge cases
([#2646](https://github.com/apache/datafusion-comet/issues/2646)) |
+| Count | | Yes |
|
+| CovPopulation | | Yes |
|
+| CovSample | | Yes |
|
+| First | | No | This function is
not deterministic. Results may not match Spark.
|
+| Last | | No | This function is
not deterministic. Results may not match Spark.
|
+| Max | | Yes |
|
+| Min | | Yes |
|
+| StddevPop | | Yes |
|
+| StddevSamp | | Yes |
|
+| Sum | | Yes, except for ANSI mode |
|
+| VariancePop | | Yes |
|
+| VarianceSamp | | Yes |
|
## Window Functions
@@ -233,7 +233,7 @@ Comet supports using the following aggregate functions
within window contexts wi
| -------------- | ----------------- |
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
| ArrayAppend | No |
|
| ArrayCompact | No |
|
-| ArrayContains | Yes |
|
+| ArrayContains | No | Returns null instead of false for empty
arrays with literal values
([#3346](https://github.com/apache/datafusion-comet/issues/3346))
|
| ArrayDistinct | No | Behaves differently than spark. Comet
first sorts then removes duplicates while Spark preserves the original order.
|
| ArrayExcept | No |
|
| ArrayFilter | Yes | Only supports case where function is
`IsNotNull`
|
@@ -242,14 +242,14 @@ Comet supports using the following aggregate functions
within window contexts wi
| ArrayJoin | No |
|
| ArrayMax | Yes |
|
| ArrayMin | Yes |
|
-| ArrayRemove | Yes |
|
+| ArrayRemove | No | Returns null when element is null
instead of removing null elements
([#3173](https://github.com/apache/datafusion-comet/issues/3173))
|
| ArrayRepeat | No |
|
| ArrayUnion | No | Behaves differently than spark. Comet
sorts the input arrays before performing the union, while Spark preserves the
order of the first array and appends unique elements from the second. |
| ArraysOverlap | No |
|
| CreateArray | Yes |
|
| ElementAt | Yes | Input must be an array. Map inputs are
not supported.
|
| Flatten | Yes |
|
-| GetArrayItem | Yes |
|
+| GetArrayItem | No | Known correctness issues with index
handling ([#3330](https://github.com/apache/datafusion-comet/issues/3330),
[#3332](https://github.com/apache/datafusion-comet/issues/3332)) |
## Map Expressions
@@ -263,13 +263,13 @@ Comet supports using the following aggregate functions
within window contexts wi
## Struct Expressions
-| Expression | Spark-Compatible? | Compatibility Notes
|
-| -------------------- | ----------------- |
------------------------------------------ |
-| CreateNamedStruct | Yes |
|
-| GetArrayStructFields | Yes |
|
-| GetStructField | Yes |
|
-| JsonToStructs | No | Partial support. Requires
explicit schema. |
-| StructsToJson | Yes |
|
+| Expression | Spark-Compatible? | Compatibility Notes
|
+| -------------------- | ----------------- |
-----------------------------------------------------------------------------------------------------------------------
|
+| CreateNamedStruct | Yes |
|
+| GetArrayStructFields | Yes |
|
+| GetStructField | Yes |
|
+| JsonToStructs | No | Partial support. Requires
explicit schema.
|
+| StructsToJson | No | Does not support
Infinity/-Infinity for numeric types
([#3016](https://github.com/apache/datafusion-comet/issues/3016)) |
## Conversion Expressions
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index 0b9dadd35..8c39ba779 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -116,7 +116,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
classOf[Sinh] -> CometScalarFunction("sinh"),
classOf[Sqrt] -> CometScalarFunction("sqrt"),
classOf[Subtract] -> CometSubtract,
- classOf[Tan] -> CometScalarFunction("tan"),
+ classOf[Tan] -> CometTan,
classOf[Tanh] -> CometScalarFunction("tanh"),
classOf[Cot] -> CometScalarFunction("cot"),
classOf[UnaryMinus] -> CometUnaryMinus,
diff --git a/spark/src/main/scala/org/apache/comet/serde/aggregates.scala
b/spark/src/main/scala/org/apache/comet/serde/aggregates.scala
index 8e58c0874..1485589b4 100644
--- a/spark/src/main/scala/org/apache/comet/serde/aggregates.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/aggregates.scala
@@ -584,6 +584,13 @@ object CometStddevPop extends
CometAggregateExpressionSerde[StddevPop] with Come
}
object CometCorr extends CometAggregateExpressionSerde[Corr] {
+
+ override def getSupportLevel(expr: Corr): SupportLevel =
+ Incompatible(
+ Some(
+ "Returns null instead of NaN in some edge cases" +
+ " (https://github.com/apache/datafusion-comet/issues/2646)"))
+
override def convert(
aggExpr: AggregateExpression,
corr: Corr,
diff --git a/spark/src/main/scala/org/apache/comet/serde/arrays.scala
b/spark/src/main/scala/org/apache/comet/serde/arrays.scala
index 79e995f2e..c82018fe6 100644
--- a/spark/src/main/scala/org/apache/comet/serde/arrays.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/arrays.scala
@@ -35,6 +35,12 @@ object CometArrayRemove
with CometExprShim
with ArraysBase {
+ override def getSupportLevel(expr: ArrayRemove): SupportLevel =
+ Incompatible(
+ Some(
+ "Returns null when element is null instead of removing null elements" +
+ " (https://github.com/apache/datafusion-comet/issues/3173)"))
+
override def convert(
expr: ArrayRemove,
inputs: Seq[Attribute],
@@ -131,6 +137,13 @@ object CometArrayAppend extends
CometExpressionSerde[ArrayAppend] {
}
object CometArrayContains extends CometExpressionSerde[ArrayContains] {
+
+ override def getSupportLevel(expr: ArrayContains): SupportLevel =
+ Incompatible(
+ Some(
+ "Returns null instead of false for empty arrays with literal values" +
+ " (https://github.com/apache/datafusion-comet/issues/3346)"))
+
override def convert(
expr: ArrayContains,
inputs: Seq[Attribute],
@@ -472,6 +485,14 @@ object CometCreateArray extends
CometExpressionSerde[CreateArray] {
}
object CometGetArrayItem extends CometExpressionSerde[GetArrayItem] {
+
+ override def getSupportLevel(expr: GetArrayItem): SupportLevel =
+ Incompatible(
+ Some(
+ "Known correctness issues with index handling" +
+ " (https://github.com/apache/datafusion-comet/issues/3330," +
+ " https://github.com/apache/datafusion-comet/issues/3332)"))
+
override def convert(
expr: GetArrayItem,
inputs: Seq[Attribute],
diff --git a/spark/src/main/scala/org/apache/comet/serde/datetime.scala
b/spark/src/main/scala/org/apache/comet/serde/datetime.scala
index d36b6a3b4..0720f785d 100644
--- a/spark/src/main/scala/org/apache/comet/serde/datetime.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/datetime.scala
@@ -177,6 +177,18 @@ object CometQuarter extends CometExpressionSerde[Quarter]
with CometExprGetDateF
}
object CometHour extends CometExpressionSerde[Hour] {
+
+ override def getSupportLevel(expr: Hour): SupportLevel = {
+ if (expr.child.dataType.typeName == "timestamp_ntz") {
+ Incompatible(
+ Some(
+ "Incorrectly applies timezone conversion to TimestampNTZ inputs" +
+ " (https://github.com/apache/datafusion-comet/issues/3180)"))
+ } else {
+ Compatible()
+ }
+ }
+
override def convert(
expr: Hour,
inputs: Seq[Attribute],
@@ -203,6 +215,18 @@ object CometHour extends CometExpressionSerde[Hour] {
}
object CometMinute extends CometExpressionSerde[Minute] {
+
+ override def getSupportLevel(expr: Minute): SupportLevel = {
+ if (expr.child.dataType.typeName == "timestamp_ntz") {
+ Incompatible(
+ Some(
+ "Incorrectly applies timezone conversion to TimestampNTZ inputs" +
+ " (https://github.com/apache/datafusion-comet/issues/3180)"))
+ } else {
+ Compatible()
+ }
+ }
+
override def convert(
expr: Minute,
inputs: Seq[Attribute],
@@ -229,6 +253,18 @@ object CometMinute extends CometExpressionSerde[Minute] {
}
object CometSecond extends CometExpressionSerde[Second] {
+
+ override def getSupportLevel(expr: Second): SupportLevel = {
+ if (expr.child.dataType.typeName == "timestamp_ntz") {
+ Incompatible(
+ Some(
+ "Incorrectly applies timezone conversion to TimestampNTZ inputs" +
+ " (https://github.com/apache/datafusion-comet/issues/3180)"))
+ } else {
+ Compatible()
+ }
+ }
+
override def convert(
expr: Second,
inputs: Seq[Attribute],
@@ -402,10 +438,19 @@ object CometTruncTimestamp extends
CometExpressionSerde[TruncTimestamp] {
"microsecond")
override def getSupportLevel(expr: TruncTimestamp): SupportLevel = {
+ val timezone = expr.timeZoneId.getOrElse("UTC")
+ val isUtc = timezone == "UTC" || timezone == "Etc/UTC"
expr.format match {
case Literal(fmt: UTF8String, _) =>
if (supportedFormats.contains(fmt.toString.toLowerCase(Locale.ROOT))) {
- Compatible()
+ if (isUtc) {
+ Compatible()
+ } else {
+ Incompatible(
+ Some(
+ s"Incorrect results in non-UTC timezone '$timezone'" +
+ " (https://github.com/apache/datafusion-comet/issues/2649)"))
+ }
} else {
Unsupported(Some(s"Format $fmt is not supported"))
}
diff --git a/spark/src/main/scala/org/apache/comet/serde/math.scala
b/spark/src/main/scala/org/apache/comet/serde/math.scala
index 68b6e8d11..5a0393142 100644
--- a/spark/src/main/scala/org/apache/comet/serde/math.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/math.scala
@@ -19,7 +19,7 @@
package org.apache.comet.serde
-import org.apache.spark.sql.catalyst.expressions.{Abs, Atan2, Attribute, Ceil,
CheckOverflow, Expression, Floor, Hex, If, LessThanOrEqual, Literal, Log,
Log10, Log2, Unhex}
+import org.apache.spark.sql.catalyst.expressions.{Abs, Atan2, Attribute, Ceil,
CheckOverflow, Expression, Floor, Hex, If, LessThanOrEqual, Literal, Log,
Log10, Log2, Tan, Unhex}
import org.apache.spark.sql.types.{DecimalType, NumericType}
import org.apache.comet.CometSparkSessionExtensions.withInfo
@@ -38,6 +38,18 @@ object CometAtan2 extends CometExpressionSerde[Atan2] {
}
object CometCeil extends CometExpressionSerde[Ceil] {
+
+ override def getSupportLevel(expr: Ceil): SupportLevel = {
+ expr.child.dataType match {
+ case _: DecimalType =>
+ Incompatible(
+ Some(
+ "Incorrect results for Decimal type inputs" +
+ " (https://github.com/apache/datafusion-comet/issues/1729)"))
+ case _ => Compatible()
+ }
+ }
+
override def convert(
expr: Ceil,
inputs: Seq[Attribute],
@@ -58,6 +70,18 @@ object CometCeil extends CometExpressionSerde[Ceil] {
}
object CometFloor extends CometExpressionSerde[Floor] {
+
+ override def getSupportLevel(expr: Floor): SupportLevel = {
+ expr.child.dataType match {
+ case _: DecimalType =>
+ Incompatible(
+ Some(
+ "Incorrect results for Decimal type inputs" +
+ " (https://github.com/apache/datafusion-comet/issues/1729)"))
+ case _ => Compatible()
+ }
+ }
+
override def convert(
expr: Floor,
inputs: Seq[Attribute],
@@ -174,6 +198,24 @@ object CometAbs extends CometExpressionSerde[Abs] with
MathExprBase {
}
}
+object CometTan extends CometExpressionSerde[Tan] {
+
+ override def getSupportLevel(expr: Tan): SupportLevel =
+ Incompatible(
+ Some(
+ "tan(-0.0) produces incorrect result" +
+ " (https://github.com/apache/datafusion-comet/issues/1897)"))
+
+ override def convert(
+ expr: Tan,
+ inputs: Seq[Attribute],
+ binding: Boolean): Option[ExprOuterClass.Expr] = {
+ val childExpr = expr.children.map(exprToProtoInternal(_, inputs, binding))
+ val optExpr = scalarFunctionExprToProto("tan", childExpr: _*)
+ optExprWithInfo(optExpr, expr, expr.children: _*)
+ }
+}
+
sealed trait MathExprBase {
protected def nullIfNegative(expression: Expression): Expression = {
val zero = Literal.default(expression.dataType)
diff --git a/spark/src/main/scala/org/apache/comet/serde/structs.scala
b/spark/src/main/scala/org/apache/comet/serde/structs.scala
index d9d83f659..449d0fc5b 100644
--- a/spark/src/main/scala/org/apache/comet/serde/structs.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/structs.scala
@@ -105,6 +105,12 @@ object CometGetArrayStructFields extends
CometExpressionSerde[GetArrayStructFiel
object CometStructsToJson extends CometExpressionSerde[StructsToJson] {
+ override def getSupportLevel(expr: StructsToJson): SupportLevel =
+ Incompatible(
+ Some(
+ "Does not support Infinity/-Infinity for numeric types" +
+ " (https://github.com/apache/datafusion-comet/issues/3016)"))
+
override def convert(
expr: StructsToJson,
inputs: Seq[Attribute],
diff --git a/spark/src/test/resources/sql-tests/expressions/aggregate/corr.sql
b/spark/src/test/resources/sql-tests/expressions/aggregate/corr.sql
index 9d11ba0ca..4231da316 100644
--- a/spark/src/test/resources/sql-tests/expressions/aggregate/corr.sql
+++ b/spark/src/test/resources/sql-tests/expressions/aggregate/corr.sql
@@ -15,6 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
+-- Config: spark.comet.expression.Corr.allowIncompatible=true
-- ConfigMatrix: parquet.enable.dictionary=false,true
statement
diff --git
a/spark/src/test/resources/sql-tests/expressions/array/array_remove.sql
b/spark/src/test/resources/sql-tests/expressions/array/array_remove.sql
index f91089d55..aead1fa44 100644
--- a/spark/src/test/resources/sql-tests/expressions/array/array_remove.sql
+++ b/spark/src/test/resources/sql-tests/expressions/array/array_remove.sql
@@ -15,6 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
+-- Config: spark.comet.expression.ArrayRemove.allowIncompatible=true
-- ConfigMatrix: parquet.enable.dictionary=false,true
statement
diff --git
a/spark/src/test/resources/sql-tests/expressions/datetime/trunc_timestamp.sql
b/spark/src/test/resources/sql-tests/expressions/datetime/trunc_timestamp.sql
index 1105d014f..661866543 100644
---
a/spark/src/test/resources/sql-tests/expressions/datetime/trunc_timestamp.sql
+++
b/spark/src/test/resources/sql-tests/expressions/datetime/trunc_timestamp.sql
@@ -15,6 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
+-- Config: spark.comet.expression.TruncTimestamp.allowIncompatible=true
-- ConfigMatrix: parquet.enable.dictionary=false,true
statement
diff --git
a/spark/src/test/resources/sql-tests/expressions/map/map_contains_key.sql
b/spark/src/test/resources/sql-tests/expressions/map/map_contains_key.sql
index 7dc3ce436..a70f4a1bd 100644
--- a/spark/src/test/resources/sql-tests/expressions/map/map_contains_key.sql
+++ b/spark/src/test/resources/sql-tests/expressions/map/map_contains_key.sql
@@ -15,6 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
+-- Config: spark.comet.expression.ArrayContains.allowIncompatible=true
-- ConfigMatrix: parquet.enable.dictionary=false,true
-- TODO: replace map_from_arrays with map whenever map is supported in Comet
diff --git a/spark/src/test/resources/sql-tests/expressions/math/ceil.sql
b/spark/src/test/resources/sql-tests/expressions/math/ceil.sql
index fade75d28..c11c42bed 100644
--- a/spark/src/test/resources/sql-tests/expressions/math/ceil.sql
+++ b/spark/src/test/resources/sql-tests/expressions/math/ceil.sql
@@ -15,6 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
+-- Config: spark.comet.expression.Ceil.allowIncompatible=true
-- ConfigMatrix: parquet.enable.dictionary=false,true
statement
diff --git a/spark/src/test/resources/sql-tests/expressions/math/floor.sql
b/spark/src/test/resources/sql-tests/expressions/math/floor.sql
index 396000284..62fa2a404 100644
--- a/spark/src/test/resources/sql-tests/expressions/math/floor.sql
+++ b/spark/src/test/resources/sql-tests/expressions/math/floor.sql
@@ -15,6 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
+-- Config: spark.comet.expression.Floor.allowIncompatible=true
-- ConfigMatrix: parquet.enable.dictionary=false,true
statement
diff --git
a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala
b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala
index 65b2c8537..fb5531a57 100644
--- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala
@@ -24,6 +24,7 @@ import scala.util.Random
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.CometTestBase
import org.apache.spark.sql.catalyst.expressions.{ArrayAppend, ArrayDistinct,
ArrayExcept, ArrayInsert, ArrayIntersect, ArrayJoin, ArrayRepeat,
ArraysOverlap, ArrayUnion}
+import org.apache.spark.sql.catalyst.expressions.{ArrayContains, ArrayRemove}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
@@ -37,50 +38,57 @@ import org.apache.comet.testing.{DataGenOptions,
ParquetGenerator, SchemaGenOpti
class CometArrayExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
test("array_remove - integer") {
- Seq(true, false).foreach { dictionaryEnabled =>
- withTempView("t1") {
- withTempDir { dir =>
- val path = new Path(dir.toURI.toString, "test.parquet")
- makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled, 10000)
- spark.read.parquet(path.toString).createOrReplaceTempView("t1")
- checkSparkAnswerAndOperator(
- sql("SELECT array_remove(array(_2, _3,_4), _2) from t1 where _2 is
null"))
- checkSparkAnswerAndOperator(
- sql("SELECT array_remove(array(_2, _3,_4), _3) from t1 where _3 is
not null"))
- checkSparkAnswerAndOperator(sql(
- "SELECT array_remove(case when _2 = _3 THEN array(_2, _3,_4) ELSE
null END, _3) from t1"))
+ withSQLConf(CometConf.getExprAllowIncompatConfigKey(classOf[ArrayRemove])
-> "true") {
+ Seq(true, false).foreach { dictionaryEnabled =>
+ withTempView("t1") {
+ withTempDir { dir =>
+ val path = new Path(dir.toURI.toString, "test.parquet")
+ makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled, 10000)
+ spark.read.parquet(path.toString).createOrReplaceTempView("t1")
+ checkSparkAnswerAndOperator(
+ sql("SELECT array_remove(array(_2, _3,_4), _2) from t1 where _2
is null"))
+ checkSparkAnswerAndOperator(
+ sql("SELECT array_remove(array(_2, _3,_4), _3) from t1 where _3
is not null"))
+ checkSparkAnswerAndOperator(sql(
+ "SELECT array_remove(case when _2 = _3 THEN array(_2, _3,_4)
ELSE null END, _3) from t1"))
+ }
}
}
}
}
test("array_remove - test all types (native Parquet reader)") {
- withTempDir { dir =>
- withTempView("t1") {
- val path = new Path(dir.toURI.toString, "test.parquet")
- val filename = path.toString
- val random = new Random(42)
- withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
- ParquetGenerator.makeParquetFile(
- random,
- spark,
- filename,
- 100,
- SchemaGenOptions(generateArray = false, generateStruct = false,
generateMap = false),
- DataGenOptions(allowNull = true, generateNegativeZero = true))
- }
- val table = spark.read.parquet(filename)
- table.createOrReplaceTempView("t1")
- // test with array of each column
- val fieldNames =
- table.schema.fields
- .filter(field => CometArrayRemove.isTypeSupported(field.dataType))
- .map(_.name)
- for (fieldName <- fieldNames) {
- sql(s"SELECT array($fieldName, $fieldName) as a, $fieldName as b
FROM t1")
- .createOrReplaceTempView("t2")
- val df = sql("SELECT array_remove(a, b) FROM t2")
- checkSparkAnswerAndOperator(df)
+ withSQLConf(CometConf.getExprAllowIncompatConfigKey(classOf[ArrayRemove])
-> "true") {
+ withTempDir { dir =>
+ withTempView("t1") {
+ val path = new Path(dir.toURI.toString, "test.parquet")
+ val filename = path.toString
+ val random = new Random(42)
+ withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
+ ParquetGenerator.makeParquetFile(
+ random,
+ spark,
+ filename,
+ 100,
+ SchemaGenOptions(
+ generateArray = false,
+ generateStruct = false,
+ generateMap = false),
+ DataGenOptions(allowNull = true, generateNegativeZero = true))
+ }
+ val table = spark.read.parquet(filename)
+ table.createOrReplaceTempView("t1")
+ // test with array of each column
+ val fieldNames =
+ table.schema.fields
+ .filter(field =>
CometArrayRemove.isTypeSupported(field.dataType))
+ .map(_.name)
+ for (fieldName <- fieldNames) {
+ sql(s"SELECT array($fieldName, $fieldName) as a, $fieldName as b
FROM t1")
+ .createOrReplaceTempView("t2")
+ val df = sql("SELECT array_remove(a, b) FROM t2")
+ checkSparkAnswerAndOperator(df)
+ }
}
}
}
@@ -129,7 +137,7 @@ class CometArrayExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelp
sql("SELECT array(struct(_1, _2)) as a, struct(_1, _2) as b FROM t1")
.createOrReplaceTempView("t2")
val expectedFallbackReason =
- "data type not supported:
ArrayType(StructType(StructField(_1,BooleanType,true),StructField(_2,ByteType,true)),false)"
+ "is not fully compatible with Spark"
checkSparkAnswerAndFallbackReason(
sql("SELECT array_remove(a, b) FROM t2"),
expectedFallbackReason)
@@ -245,54 +253,59 @@ class CometArrayExpressionSuite extends CometTestBase
with AdaptiveSparkPlanHelp
}
test("array_contains - int values") {
- withTempDir { dir =>
- withTempView("t1") {
- val path = new Path(dir.toURI.toString, "test.parquet")
- makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = false, n =
10000)
- spark.read.parquet(path.toString).createOrReplaceTempView("t1");
- checkSparkAnswerAndOperator(
- spark.sql("SELECT array_contains(array(_2, _3, _4), _2) FROM t1"))
- checkSparkAnswerAndOperator(
- spark.sql("SELECT array_contains((CASE WHEN _2 =_3 THEN array(_4)
END), _4) FROM t1"));
+
withSQLConf(CometConf.getExprAllowIncompatConfigKey(classOf[ArrayContains]) ->
"true") {
+ withTempDir { dir =>
+ withTempView("t1") {
+ val path = new Path(dir.toURI.toString, "test.parquet")
+ makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = false, n
= 10000)
+ spark.read.parquet(path.toString).createOrReplaceTempView("t1");
+ checkSparkAnswerAndOperator(
+ spark.sql("SELECT array_contains(array(_2, _3, _4), _2) FROM t1"))
+ checkSparkAnswerAndOperator(
+ spark.sql(
+ "SELECT array_contains((CASE WHEN _2 =_3 THEN array(_4) END),
_4) FROM t1"));
+ }
}
}
}
test("array_contains - test all types (native Parquet reader)") {
- withTempDir { dir =>
- withTempView("t1", "t2", "t3") {
- val path = new Path(dir.toURI.toString, "test.parquet")
- val filename = path.toString
- val random = new Random(42)
- withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
- ParquetGenerator.makeParquetFile(
- random,
- spark,
- filename,
- 100,
- SchemaGenOptions(generateArray = true, generateStruct = true,
generateMap = false),
- DataGenOptions(allowNull = true, generateNegativeZero = true))
- }
- val table = spark.read.parquet(filename)
- table.createOrReplaceTempView("t1")
- val complexTypeFields =
- table.schema.fields.filter(field => isComplexType(field.dataType))
- val primitiveTypeFields =
- table.schema.fields.filterNot(field => isComplexType(field.dataType))
- for (field <- primitiveTypeFields) {
- val fieldName = field.name
- val typeName = field.dataType.typeName
- sql(s"SELECT array($fieldName, $fieldName) as a, $fieldName as b
FROM t1")
- .createOrReplaceTempView("t2")
- checkSparkAnswerAndOperator(sql("SELECT array_contains(a, b) FROM
t2"))
- checkSparkAnswerAndOperator(
- sql(s"SELECT array_contains(a, cast(null as $typeName)) FROM t2"))
- }
- for (field <- complexTypeFields) {
- val fieldName = field.name
- sql(s"SELECT array($fieldName, $fieldName) as a, $fieldName as b
FROM t1")
- .createOrReplaceTempView("t3")
- checkSparkAnswer(sql("SELECT array_contains(a, b) FROM t3"))
+
withSQLConf(CometConf.getExprAllowIncompatConfigKey(classOf[ArrayContains]) ->
"true") {
+ withTempDir { dir =>
+ withTempView("t1", "t2", "t3") {
+ val path = new Path(dir.toURI.toString, "test.parquet")
+ val filename = path.toString
+ val random = new Random(42)
+ withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
+ ParquetGenerator.makeParquetFile(
+ random,
+ spark,
+ filename,
+ 100,
+ SchemaGenOptions(generateArray = true, generateStruct = true,
generateMap = false),
+ DataGenOptions(allowNull = true, generateNegativeZero = true))
+ }
+ val table = spark.read.parquet(filename)
+ table.createOrReplaceTempView("t1")
+ val complexTypeFields =
+ table.schema.fields.filter(field => isComplexType(field.dataType))
+ val primitiveTypeFields =
+ table.schema.fields.filterNot(field =>
isComplexType(field.dataType))
+ for (field <- primitiveTypeFields) {
+ val fieldName = field.name
+ val typeName = field.dataType.typeName
+ sql(s"SELECT array($fieldName, $fieldName) as a, $fieldName as b
FROM t1")
+ .createOrReplaceTempView("t2")
+ checkSparkAnswerAndOperator(sql("SELECT array_contains(a, b) FROM
t2"))
+ checkSparkAnswerAndOperator(
+ sql(s"SELECT array_contains(a, cast(null as $typeName)) FROM
t2"))
+ }
+ for (field <- complexTypeFields) {
+ val fieldName = field.name
+ sql(s"SELECT array($fieldName, $fieldName) as a, $fieldName as b
FROM t1")
+ .createOrReplaceTempView("t3")
+ checkSparkAnswer(sql("SELECT array_contains(a, b) FROM t3"))
+ }
}
}
}
diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
index 570db1795..eeaf1ed91 100644
--- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
@@ -28,7 +28,7 @@ import org.scalatest.Tag
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{CometTestBase, DataFrame, Row}
-import org.apache.spark.sql.catalyst.expressions.{Alias, Cast, FromUnixTime,
Literal, TruncDate, TruncTimestamp}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Cast, Ceil, Floor,
FromUnixTime, GetArrayItem, Literal, StructsToJson, Tan, TruncDate,
TruncTimestamp}
import org.apache.spark.sql.catalyst.optimizer.SimplifyExtractValueOps
import org.apache.spark.sql.comet.CometProjectExec
import org.apache.spark.sql.execution.{ProjectExec, SparkPlan}
@@ -666,34 +666,36 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
}
test("date_trunc") {
- Seq(true, false).foreach { dictionaryEnabled =>
- withTempDir { dir =>
- val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet")
- makeRawTimeParquetFile(path, dictionaryEnabled = dictionaryEnabled,
10000)
- withParquetTable(path.toString, "timetbl") {
- Seq(
- "YEAR",
- "YYYY",
- "YY",
- "MON",
- "MONTH",
- "MM",
- "QUARTER",
- "WEEK",
- "DAY",
- "DD",
- "HOUR",
- "MINUTE",
- "SECOND",
- "MILLISECOND",
- "MICROSECOND").foreach { format =>
- checkSparkAnswerAndOperator(
- "SELECT " +
- s"date_trunc('$format', _0), " +
- s"date_trunc('$format', _1), " +
- s"date_trunc('$format', _2), " +
- s"date_trunc('$format', _4) " +
- " from timetbl")
+
withSQLConf(CometConf.getExprAllowIncompatConfigKey(classOf[TruncTimestamp]) ->
"true") {
+ Seq(true, false).foreach { dictionaryEnabled =>
+ withTempDir { dir =>
+ val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet")
+ makeRawTimeParquetFile(path, dictionaryEnabled = dictionaryEnabled,
10000)
+ withParquetTable(path.toString, "timetbl") {
+ Seq(
+ "YEAR",
+ "YYYY",
+ "YY",
+ "MON",
+ "MONTH",
+ "MM",
+ "QUARTER",
+ "WEEK",
+ "DAY",
+ "DD",
+ "HOUR",
+ "MINUTE",
+ "SECOND",
+ "MILLISECOND",
+ "MICROSECOND").foreach { format =>
+ checkSparkAnswerAndOperator(
+ "SELECT " +
+ s"date_trunc('$format', _0), " +
+ s"date_trunc('$format', _1), " +
+ s"date_trunc('$format', _2), " +
+ s"date_trunc('$format', _4) " +
+ " from timetbl")
+ }
}
}
}
@@ -701,7 +703,9 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
}
test("date_trunc with timestamp_ntz") {
- withSQLConf(CometConf.getExprAllowIncompatConfigKey(classOf[Cast]) ->
"true") {
+ withSQLConf(
+ CometConf.getExprAllowIncompatConfigKey(classOf[Cast]) -> "true",
+ CometConf.getExprAllowIncompatConfigKey(classOf[TruncTimestamp]) ->
"true") {
Seq(true, false).foreach { dictionaryEnabled =>
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet")
@@ -770,7 +774,8 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
Seq(false, true).foreach { conversionEnabled =>
withSQLConf(
SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "INT96",
- SQLConf.PARQUET_INT96_TIMESTAMP_CONVERSION.key ->
conversionEnabled.toString) {
+ SQLConf.PARQUET_INT96_TIMESTAMP_CONVERSION.key ->
conversionEnabled.toString,
+ CometConf.getExprAllowIncompatConfigKey(classOf[TruncTimestamp]) ->
"true") {
withTempPath { path =>
Seq
.tabulate(N)(_ => ts)
@@ -1312,39 +1317,41 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
test("various math scalar functions") {
val data = doubleValues.map(n => (n, n))
- withParquetTable(data, "tbl") {
- // expressions with single arg
- for (expr <- Seq(
- "acos",
- "asin",
- "atan",
- "cos",
- "cosh",
- "exp",
- "ln",
- "log10",
- "log2",
- "sin",
- "sinh",
- "sqrt",
- "tan",
- "tanh",
- "cot")) {
- val (_, cometPlan) =
- checkSparkAnswerAndOperatorWithTol(sql(s"SELECT $expr(_1), $expr(_2)
FROM tbl"))
- val cometProjectExecs = collect(cometPlan) { case op: CometProjectExec
=>
- op
- }
- assert(cometProjectExecs.length == 1, expr)
- }
- // expressions with two args
- for (expr <- Seq("atan2", "pow")) {
- val (_, cometPlan) =
- checkSparkAnswerAndOperatorWithTol(sql(s"SELECT $expr(_1, _2) FROM
tbl"))
- val cometProjectExecs = collect(cometPlan) { case op: CometProjectExec
=>
- op
- }
- assert(cometProjectExecs.length == 1, expr)
+ withSQLConf(CometConf.getExprAllowIncompatConfigKey(classOf[Tan]) ->
"true") {
+ withParquetTable(data, "tbl") {
+ // expressions with single arg
+ for (expr <- Seq(
+ "acos",
+ "asin",
+ "atan",
+ "cos",
+ "cosh",
+ "exp",
+ "ln",
+ "log10",
+ "log2",
+ "sin",
+ "sinh",
+ "sqrt",
+ "tan",
+ "tanh",
+ "cot")) {
+ val (_, cometPlan) =
+ checkSparkAnswerAndOperatorWithTol(sql(s"SELECT $expr(_1),
$expr(_2) FROM tbl"))
+ val cometProjectExecs = collect(cometPlan) { case op:
CometProjectExec =>
+ op
+ }
+ assert(cometProjectExecs.length == 1, expr)
+ }
+ // expressions with two args
+ for (expr <- Seq("atan2", "pow")) {
+ val (_, cometPlan) =
+ checkSparkAnswerAndOperatorWithTol(sql(s"SELECT $expr(_1, _2) FROM
tbl"))
+ val cometProjectExecs = collect(cometPlan) { case op:
CometProjectExec =>
+ op
+ }
+ assert(cometProjectExecs.length == 1, expr)
+ }
}
}
}
@@ -1408,7 +1415,10 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
test("ceil and floor") {
Seq("true", "false").foreach { dictionary =>
- withSQLConf("parquet.enable.dictionary" -> dictionary) {
+ withSQLConf(
+ "parquet.enable.dictionary" -> dictionary,
+ CometConf.getExprAllowIncompatConfigKey(classOf[Ceil]) -> "true",
+ CometConf.getExprAllowIncompatConfigKey(classOf[Floor]) -> "true") {
withParquetTable(
(-5 until 5).map(i => (i.toDouble + 0.3, i.toDouble + 0.8)),
"tbl",
@@ -2199,24 +2209,26 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
test("to_json") {
// TODO fix for Spark 4.0.0
assume(!isSpark40Plus)
- Seq(true, false).foreach { dictionaryEnabled =>
- withParquetTable(
- (0 until 100).map(i => {
- val str = if (i % 2 == 0) {
- "even"
- } else {
- "odd"
- }
- (i.toByte, i.toShort, i, i.toLong, i * 1.2f, -i * 1.2d, str,
i.toString)
- }),
- "tbl",
- withDictionary = dictionaryEnabled) {
+
withSQLConf(CometConf.getExprAllowIncompatConfigKey(classOf[StructsToJson]) ->
"true") {
+ Seq(true, false).foreach { dictionaryEnabled =>
+ withParquetTable(
+ (0 until 100).map(i => {
+ val str = if (i % 2 == 0) {
+ "even"
+ } else {
+ "odd"
+ }
+ (i.toByte, i.toShort, i, i.toLong, i * 1.2f, -i * 1.2d, str,
i.toString)
+ }),
+ "tbl",
+ withDictionary = dictionaryEnabled) {
- val fields = Range(1, 8).map(n => s"'col$n', _$n").mkString(", ")
+ val fields = Range(1, 8).map(n => s"'col$n', _$n").mkString(", ")
- checkSparkAnswerAndOperator(s"SELECT to_json(named_struct($fields))
FROM tbl")
- checkSparkAnswerAndOperator(
- s"SELECT to_json(named_struct('nested', named_struct($fields))) FROM
tbl")
+ checkSparkAnswerAndOperator(s"SELECT to_json(named_struct($fields))
FROM tbl")
+ checkSparkAnswerAndOperator(
+ s"SELECT to_json(named_struct('nested', named_struct($fields)))
FROM tbl")
+ }
}
}
}
@@ -2224,28 +2236,30 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
test("to_json escaping of field names and string values") {
// TODO fix for Spark 4.0.0
assume(!isSpark40Plus)
- val gen = new DataGenerator(new Random(42))
- val chars = "\\'\"abc\t\r\n\f\b"
- Seq(true, false).foreach { dictionaryEnabled =>
- withParquetTable(
- (0 until 100).map(i => {
- val str1 = gen.generateString(chars, 8)
- val str2 = gen.generateString(chars, 8)
- (i.toString, str1, str2)
- }),
- "tbl",
- withDictionary = dictionaryEnabled) {
+
withSQLConf(CometConf.getExprAllowIncompatConfigKey(classOf[StructsToJson]) ->
"true") {
+ val gen = new DataGenerator(new Random(42))
+ val chars = "\\'\"abc\t\r\n\f\b"
+ Seq(true, false).foreach { dictionaryEnabled =>
+ withParquetTable(
+ (0 until 100).map(i => {
+ val str1 = gen.generateString(chars, 8)
+ val str2 = gen.generateString(chars, 8)
+ (i.toString, str1, str2)
+ }),
+ "tbl",
+ withDictionary = dictionaryEnabled) {
- val fields = Range(1, 3)
- .map(n => {
- val columnName = s"""column "$n""""
- s"'$columnName', _$n"
- })
- .mkString(", ")
+ val fields = Range(1, 3)
+ .map(n => {
+ val columnName = s"""column "$n""""
+ s"'$columnName', _$n"
+ })
+ .mkString(", ")
- checkSparkAnswerAndOperator(
- """SELECT 'column "1"' x, """ +
- s"to_json(named_struct($fields)) FROM tbl ORDER BY x")
+ checkSparkAnswerAndOperator(
+ """SELECT 'column "1"' x, """ +
+ s"to_json(named_struct($fields)) FROM tbl ORDER BY x")
+ }
}
}
}
@@ -2253,24 +2267,26 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
test("to_json unicode") {
// TODO fix for Spark 4.0.0
assume(!isSpark40Plus)
- Seq(true, false).foreach { dictionaryEnabled =>
- withParquetTable(
- (0 until 100).map(i => {
- (i.toString, "\uD83E\uDD11", "\u018F")
- }),
- "tbl",
- withDictionary = dictionaryEnabled) {
+
withSQLConf(CometConf.getExprAllowIncompatConfigKey(classOf[StructsToJson]) ->
"true") {
+ Seq(true, false).foreach { dictionaryEnabled =>
+ withParquetTable(
+ (0 until 100).map(i => {
+ (i.toString, "\uD83E\uDD11", "\u018F")
+ }),
+ "tbl",
+ withDictionary = dictionaryEnabled) {
- val fields = Range(1, 3)
- .map(n => {
- val columnName = s"""column "$n""""
- s"'$columnName', _$n"
- })
- .mkString(", ")
+ val fields = Range(1, 3)
+ .map(n => {
+ val columnName = s"""column "$n""""
+ s"'$columnName', _$n"
+ })
+ .mkString(", ")
- checkSparkAnswerAndOperator(
- """SELECT 'column "1"' x, """ +
- s"to_json(named_struct($fields)) FROM tbl ORDER BY x")
+ checkSparkAnswerAndOperator(
+ """SELECT 'column "1"' x, """ +
+ s"to_json(named_struct($fields)) FROM tbl ORDER BY x")
+ }
}
}
}
@@ -2571,7 +2587,8 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
withSQLConf(
SQLConf.ANSI_ENABLED.key -> ansiEnabled.toString(),
// Prevent the optimizer from collapsing an extract value of a
create array
- SQLConf.OPTIMIZER_EXCLUDED_RULES.key ->
SimplifyExtractValueOps.ruleName) {
+ SQLConf.OPTIMIZER_EXCLUDED_RULES.key ->
SimplifyExtractValueOps.ruleName,
+ CometConf.getExprAllowIncompatConfigKey(classOf[GetArrayItem]) ->
"true") {
val df = spark.read.parquet(path.toString)
val stringArray = df.select(array(col("_8"), col("_8"),
lit(null)).alias("arr"))
diff --git
a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
index 14b5dc309..9426d1c84 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
@@ -24,6 +24,7 @@ import scala.util.Random
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{CometTestBase, DataFrame, Row}
import org.apache.spark.sql.catalyst.expressions.Cast
+import org.apache.spark.sql.catalyst.expressions.aggregate.Corr
import org.apache.spark.sql.catalyst.optimizer.EliminateSorts
import org.apache.spark.sql.comet.CometHashAggregateExec
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
@@ -1319,7 +1320,9 @@ class CometAggregateSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
}
test("covariance & correlation") {
- withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
+ withSQLConf(
+ CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
+ CometConf.getExprAllowIncompatConfigKey(classOf[Corr]) -> "true") {
Seq("jvm", "native").foreach { cometShuffleMode =>
withSQLConf(CometConf.COMET_SHUFFLE_MODE.key -> cometShuffleMode) {
Seq(true, false).foreach { dictionary =>
diff --git
a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala
b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala
index 84a326c79..05e82bdfb 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala
@@ -31,6 +31,8 @@ import org.apache.spark.sql.types.{IntegerType, StringType,
StructType}
import org.apache.comet.CometConf
class CometNativeReaderSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
+ import org.apache.spark.sql.catalyst.expressions.GetArrayItem
+
override protected def test(testName: String, testTags: Tag*)(testFun: =>
Any)(implicit
pos: Position): Unit = {
Seq(CometConf.SCAN_NATIVE_DATAFUSION,
CometConf.SCAN_NATIVE_ICEBERG_COMPAT).foreach(scan =>
@@ -40,7 +42,8 @@ class CometNativeReaderSuite extends CometTestBase with
AdaptiveSparkPlanHelper
SQLConf.USE_V1_SOURCE_LIST.key -> "parquet",
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "false",
- CometConf.COMET_NATIVE_SCAN_IMPL.key -> scan) {
+ CometConf.COMET_NATIVE_SCAN_IMPL.key -> scan,
+ CometConf.getExprAllowIncompatConfigKey(classOf[GetArrayItem]) ->
"true") {
testFun
}
})
diff --git
a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
index b9caa9430..09a2308e3 100644
--- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
@@ -35,6 +35,7 @@ import org.apache.parquet.example.data.simple.SimpleGroup
import org.apache.parquet.schema.MessageTypeParser
import org.apache.spark.SparkException
import org.apache.spark.sql.{CometTestBase, DataFrame, Row}
+import org.apache.spark.sql.catalyst.expressions.GetArrayItem
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
@@ -1505,7 +1506,9 @@ class ParquetReadV1Suite extends ParquetReadSuite with
AdaptiveSparkPlanHelper {
withParquetTable(path.toUri.toString, "complex_types") {
Seq(CometConf.SCAN_NATIVE_DATAFUSION,
CometConf.SCAN_NATIVE_ICEBERG_COMPAT).foreach(
scanMode => {
- withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanMode) {
+ withSQLConf(
+ CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanMode,
+ CometConf.getExprAllowIncompatConfigKey(classOf[GetArrayItem])
-> "true") {
checkSparkAnswerAndOperator(sql("select * from complex_types"))
// First level
checkSparkAnswerAndOperator(sql(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]