This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch comet-parquet-exec
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/comet-parquet-exec by this
push:
new c6f49853 fix: [comet-parquet-exec] Fix timestamp cast errors (#1191)
c6f49853 is described below
commit c6f49853d534f585c8e7bdd6d0c8716248dd6c2e
Author: Andy Grove <[email protected]>
AuthorDate: Fri Dec 20 12:04:52 2024 -0700
fix: [comet-parquet-exec] Fix timestamp cast errors (#1191)
* fix timestamp cast
* fix imports
---
native/core/src/parquet/util/jni.rs | 1 -
native/spark-expr/src/cast.rs | 234 -------------------------------
native/spark-expr/src/schema_adapter.rs | 236 +++++++++++++++++++++++++++++++-
3 files changed, 233 insertions(+), 238 deletions(-)
diff --git a/native/core/src/parquet/util/jni.rs
b/native/core/src/parquet/util/jni.rs
index 596277b3..d9fa4992 100644
--- a/native/core/src/parquet/util/jni.rs
+++ b/native/core/src/parquet/util/jni.rs
@@ -24,7 +24,6 @@ use jni::{
JNIEnv,
};
-use crate::execution::sort::RdxSort;
use arrow::error::ArrowError;
use arrow::ipc::reader::StreamReader;
use datafusion_execution::object_store::ObjectStoreUrl;
diff --git a/native/spark-expr/src/cast.rs b/native/spark-expr/src/cast.rs
index 09530c3c..08541e1d 100644
--- a/native/spark-expr/src/cast.rs
+++ b/native/spark-expr/src/cast.rs
@@ -138,240 +138,6 @@ pub struct Cast {
pub cast_options: SparkCastOptions,
}
-/// Determine if Comet supports a cast, taking options such as EvalMode and
Timezone into account.
-pub fn cast_supported(
- from_type: &DataType,
- to_type: &DataType,
- options: &SparkCastOptions,
-) -> bool {
- use DataType::*;
-
- let from_type = if let Dictionary(_, dt) = from_type {
- dt
- } else {
- from_type
- };
-
- let to_type = if let Dictionary(_, dt) = to_type {
- dt
- } else {
- to_type
- };
-
- if from_type == to_type {
- return true;
- }
-
- match (from_type, to_type) {
- (Boolean, _) => can_cast_from_boolean(to_type, options),
- (UInt8 | UInt16 | UInt32 | UInt64, Int8 | Int16 | Int32 | Int64)
- if options.allow_cast_unsigned_ints =>
- {
- true
- }
- (Int8, _) => can_cast_from_byte(to_type, options),
- (Int16, _) => can_cast_from_short(to_type, options),
- (Int32, _) => can_cast_from_int(to_type, options),
- (Int64, _) => can_cast_from_long(to_type, options),
- (Float32, _) => can_cast_from_float(to_type, options),
- (Float64, _) => can_cast_from_double(to_type, options),
- (Decimal128(p, s), _) => can_cast_from_decimal(p, s, to_type, options),
- (Timestamp(_, None), _) => can_cast_from_timestamp_ntz(to_type,
options),
- (Timestamp(_, Some(_)), _) => can_cast_from_timestamp(to_type,
options),
- (Utf8 | LargeUtf8, _) => can_cast_from_string(to_type, options),
- (_, Utf8 | LargeUtf8) => can_cast_to_string(from_type, options),
- (Struct(from_fields), Struct(to_fields)) => from_fields
- .iter()
- .zip(to_fields.iter())
- .all(|(a, b)| cast_supported(a.data_type(), b.data_type(),
options)),
- _ => false,
- }
-}
-
-fn can_cast_from_string(to_type: &DataType, options: &SparkCastOptions) ->
bool {
- use DataType::*;
- match to_type {
- Boolean | Int8 | Int16 | Int32 | Int64 | Binary => true,
- Float32 | Float64 => {
- // https://github.com/apache/datafusion-comet/issues/326
- // Does not support inputs ending with 'd' or 'f'. Does not
support 'inf'.
- // Does not support ANSI mode.
- options.allow_incompat
- }
- Decimal128(_, _) => {
- // https://github.com/apache/datafusion-comet/issues/325
- // Does not support inputs ending with 'd' or 'f'. Does not
support 'inf'.
- // Does not support ANSI mode. Returns 0.0 instead of null if
input contains no digits
-
- options.allow_incompat
- }
- Date32 | Date64 => {
- // https://github.com/apache/datafusion-comet/issues/327
- // Only supports years between 262143 BC and 262142 AD
- options.allow_incompat
- }
- Timestamp(_, _) if options.eval_mode == EvalMode::Ansi => {
- // ANSI mode not supported
- false
- }
- Timestamp(_, Some(tz)) if tz.as_ref() != "UTC" => {
- // Cast will use UTC instead of $timeZoneId
- options.allow_incompat
- }
- Timestamp(_, _) => {
- // https://github.com/apache/datafusion-comet/issues/328
- // Not all valid formats are supported
- options.allow_incompat
- }
- _ => false,
- }
-}
-
-fn can_cast_to_string(from_type: &DataType, options: &SparkCastOptions) ->
bool {
- use DataType::*;
- match from_type {
- Boolean | Int8 | Int16 | Int32 | Int64 | Date32 | Date64 |
Timestamp(_, _) => true,
- Float32 | Float64 => {
- // There can be differences in precision.
- // For example, the input \"1.4E-45\" will produce 1.0E-45 " +
- // instead of 1.4E-45"))
- true
- }
- Decimal128(_, _) => {
- // https://github.com/apache/datafusion-comet/issues/1068
- // There can be formatting differences in some case due to Spark
using
- // scientific notation where Comet does not
- true
- }
- Binary => {
- // https://github.com/apache/datafusion-comet/issues/377
- // Only works for binary data representing valid UTF-8 strings
- options.allow_incompat
- }
- Struct(fields) => fields
- .iter()
- .all(|f| can_cast_to_string(f.data_type(), options)),
- _ => false,
- }
-}
-
-fn can_cast_from_timestamp_ntz(to_type: &DataType, options: &SparkCastOptions)
-> bool {
- use DataType::*;
- match to_type {
- Timestamp(_, _) | Date32 | Date64 | Utf8 => {
- // incompatible
- options.allow_incompat
- }
- _ => {
- // unsupported
- false
- }
- }
-}
-
-fn can_cast_from_timestamp(to_type: &DataType, _options: &SparkCastOptions) ->
bool {
- use DataType::*;
- match to_type {
- Boolean | Int8 | Int16 => {
- // https://github.com/apache/datafusion-comet/issues/352
- // this seems like an edge case that isn't important for us to
support
- false
- }
- Int64 => {
- // https://github.com/apache/datafusion-comet/issues/352
- true
- }
- Date32 | Date64 | Utf8 | Decimal128(_, _) => true,
- _ => {
- // unsupported
- false
- }
- }
-}
-
-fn can_cast_from_boolean(to_type: &DataType, _: &SparkCastOptions) -> bool {
- use DataType::*;
- matches!(to_type, Int8 | Int16 | Int32 | Int64 | Float32 | Float64)
-}
-
-fn can_cast_from_byte(to_type: &DataType, _: &SparkCastOptions) -> bool {
- use DataType::*;
- matches!(
- to_type,
- Boolean | Int8 | Int16 | Int32 | Int64 | Float32 | Float64 |
Decimal128(_, _)
- )
-}
-
-fn can_cast_from_short(to_type: &DataType, _: &SparkCastOptions) -> bool {
- use DataType::*;
- matches!(
- to_type,
- Boolean | Int8 | Int16 | Int32 | Int64 | Float32 | Float64 |
Decimal128(_, _)
- )
-}
-
-fn can_cast_from_int(to_type: &DataType, options: &SparkCastOptions) -> bool {
- use DataType::*;
- match to_type {
- Boolean | Int8 | Int16 | Int32 | Int64 | Float32 | Float64 | Utf8 =>
true,
- Decimal128(_, _) => {
- // incompatible: no overflow check
- options.allow_incompat
- }
- _ => false,
- }
-}
-
-fn can_cast_from_long(to_type: &DataType, options: &SparkCastOptions) -> bool {
- use DataType::*;
- match to_type {
- Boolean | Int8 | Int16 | Int32 | Int64 | Float32 | Float64 => true,
- Decimal128(_, _) => {
- // incompatible: no overflow check
- options.allow_incompat
- }
- _ => false,
- }
-}
-
-fn can_cast_from_float(to_type: &DataType, _: &SparkCastOptions) -> bool {
- use DataType::*;
- matches!(
- to_type,
- Boolean | Int8 | Int16 | Int32 | Int64 | Float64 | Decimal128(_, _)
- )
-}
-
-fn can_cast_from_double(to_type: &DataType, _: &SparkCastOptions) -> bool {
- use DataType::*;
- matches!(
- to_type,
- Boolean | Int8 | Int16 | Int32 | Int64 | Float32 | Decimal128(_, _)
- )
-}
-
-fn can_cast_from_decimal(
- p1: &u8,
- _s1: &i8,
- to_type: &DataType,
- options: &SparkCastOptions,
-) -> bool {
- use DataType::*;
- match to_type {
- Int8 | Int16 | Int32 | Int64 | Float32 | Float64 => true,
- Decimal128(p2, _) => {
- if p2 < p1 {
- // https://github.com/apache/datafusion/issues/13492
- // Incompatible(Some("Casting to smaller precision is not
supported"))
- options.allow_incompat
- } else {
- true
- }
- }
- _ => false,
- }
-}
-
macro_rules! cast_utf8_to_int {
($array:expr, $eval_mode:expr, $array_type:ty, $cast_method:ident) => {{
let len = $array.len();
diff --git a/native/spark-expr/src/schema_adapter.rs
b/native/spark-expr/src/schema_adapter.rs
index 161ad6f1..9872b756 100644
--- a/native/spark-expr/src/schema_adapter.rs
+++ b/native/spark-expr/src/schema_adapter.rs
@@ -17,10 +17,9 @@
//! Custom schema adapter that uses Spark-compatible casts
-use crate::cast::cast_supported;
-use crate::{spark_cast, SparkCastOptions};
+use crate::{spark_cast, EvalMode, SparkCastOptions};
use arrow_array::{new_null_array, Array, RecordBatch, RecordBatchOptions};
-use arrow_schema::{Schema, SchemaRef};
+use arrow_schema::{DataType, Schema, SchemaRef};
use datafusion::datasource::schema_adapter::{SchemaAdapter,
SchemaAdapterFactory, SchemaMapper};
use datafusion_common::plan_err;
use datafusion_expr::ColumnarValue;
@@ -284,6 +283,237 @@ impl SchemaMapper for SchemaMapping {
}
}
+/// Determine if Comet supports a cast, taking options such as EvalMode and
Timezone into account.
+fn cast_supported(from_type: &DataType, to_type: &DataType, options:
&SparkCastOptions) -> bool {
+ use DataType::*;
+
+ let from_type = if let Dictionary(_, dt) = from_type {
+ dt
+ } else {
+ from_type
+ };
+
+ let to_type = if let Dictionary(_, dt) = to_type {
+ dt
+ } else {
+ to_type
+ };
+
+ if from_type == to_type {
+ return true;
+ }
+
+ match (from_type, to_type) {
+ (Boolean, _) => can_cast_from_boolean(to_type, options),
+ (UInt8 | UInt16 | UInt32 | UInt64, Int8 | Int16 | Int32 | Int64)
+ if options.allow_cast_unsigned_ints =>
+ {
+ true
+ }
+ (Int8, _) => can_cast_from_byte(to_type, options),
+ (Int16, _) => can_cast_from_short(to_type, options),
+ (Int32, _) => can_cast_from_int(to_type, options),
+ (Int64, _) => can_cast_from_long(to_type, options),
+ (Float32, _) => can_cast_from_float(to_type, options),
+ (Float64, _) => can_cast_from_double(to_type, options),
+ (Decimal128(p, s), _) => can_cast_from_decimal(p, s, to_type, options),
+ (Timestamp(_, None), _) => can_cast_from_timestamp_ntz(to_type,
options),
+ (Timestamp(_, Some(_)), _) => can_cast_from_timestamp(to_type,
options),
+ (Utf8 | LargeUtf8, _) => can_cast_from_string(to_type, options),
+ (_, Utf8 | LargeUtf8) => can_cast_to_string(from_type, options),
+ (Struct(from_fields), Struct(to_fields)) => from_fields
+ .iter()
+ .zip(to_fields.iter())
+ .all(|(a, b)| cast_supported(a.data_type(), b.data_type(),
options)),
+ _ => false,
+ }
+}
+
+fn can_cast_from_string(to_type: &DataType, options: &SparkCastOptions) ->
bool {
+ use DataType::*;
+ match to_type {
+ Boolean | Int8 | Int16 | Int32 | Int64 | Binary => true,
+ Float32 | Float64 => {
+ // https://github.com/apache/datafusion-comet/issues/326
+ // Does not support inputs ending with 'd' or 'f'. Does not
support 'inf'.
+ // Does not support ANSI mode.
+ options.allow_incompat
+ }
+ Decimal128(_, _) => {
+ // https://github.com/apache/datafusion-comet/issues/325
+ // Does not support inputs ending with 'd' or 'f'. Does not
support 'inf'.
+ // Does not support ANSI mode. Returns 0.0 instead of null if
input contains no digits
+
+ options.allow_incompat
+ }
+ Date32 | Date64 => {
+ // https://github.com/apache/datafusion-comet/issues/327
+ // Only supports years between 262143 BC and 262142 AD
+ options.allow_incompat
+ }
+ Timestamp(_, _) if options.eval_mode == EvalMode::Ansi => {
+ // ANSI mode not supported
+ false
+ }
+ Timestamp(_, Some(tz)) if tz.as_ref() != "UTC" => {
+ // Cast will use UTC instead of $timeZoneId
+ options.allow_incompat
+ }
+ Timestamp(_, _) => {
+ // https://github.com/apache/datafusion-comet/issues/328
+ // Not all valid formats are supported
+ options.allow_incompat
+ }
+ _ => false,
+ }
+}
+
+fn can_cast_to_string(from_type: &DataType, options: &SparkCastOptions) ->
bool {
+ use DataType::*;
+ match from_type {
+ Boolean | Int8 | Int16 | Int32 | Int64 | Date32 | Date64 |
Timestamp(_, _) => true,
+ Float32 | Float64 => {
+ // There can be differences in precision.
+ // For example, the input \"1.4E-45\" will produce 1.0E-45 " +
+ // instead of 1.4E-45"))
+ true
+ }
+ Decimal128(_, _) => {
+ // https://github.com/apache/datafusion-comet/issues/1068
+ // There can be formatting differences in some case due to Spark
using
+ // scientific notation where Comet does not
+ true
+ }
+ Binary => {
+ // https://github.com/apache/datafusion-comet/issues/377
+ // Only works for binary data representing valid UTF-8 strings
+ options.allow_incompat
+ }
+ Struct(fields) => fields
+ .iter()
+ .all(|f| can_cast_to_string(f.data_type(), options)),
+ _ => false,
+ }
+}
+
+fn can_cast_from_timestamp_ntz(to_type: &DataType, options: &SparkCastOptions)
-> bool {
+ use DataType::*;
+ match to_type {
+ Timestamp(_, _) | Date32 | Date64 | Utf8 => {
+ // incompatible
+ options.allow_incompat
+ }
+ _ => {
+ // unsupported
+ false
+ }
+ }
+}
+
+fn can_cast_from_timestamp(to_type: &DataType, _options: &SparkCastOptions) ->
bool {
+ use DataType::*;
+ match to_type {
+ Timestamp(_, _) => true,
+ Boolean | Int8 | Int16 => {
+ // https://github.com/apache/datafusion-comet/issues/352
+ // this seems like an edge case that isn't important for us to
support
+ false
+ }
+ Int64 => {
+ // https://github.com/apache/datafusion-comet/issues/352
+ true
+ }
+ Date32 | Date64 | Utf8 | Decimal128(_, _) => true,
+ _ => {
+ // unsupported
+ false
+ }
+ }
+}
+
+fn can_cast_from_boolean(to_type: &DataType, _: &SparkCastOptions) -> bool {
+ use DataType::*;
+ matches!(to_type, Int8 | Int16 | Int32 | Int64 | Float32 | Float64)
+}
+
+fn can_cast_from_byte(to_type: &DataType, _: &SparkCastOptions) -> bool {
+ use DataType::*;
+ matches!(
+ to_type,
+ Boolean | Int8 | Int16 | Int32 | Int64 | Float32 | Float64 |
Decimal128(_, _)
+ )
+}
+
+fn can_cast_from_short(to_type: &DataType, _: &SparkCastOptions) -> bool {
+ use DataType::*;
+ matches!(
+ to_type,
+ Boolean | Int8 | Int16 | Int32 | Int64 | Float32 | Float64 |
Decimal128(_, _)
+ )
+}
+
+fn can_cast_from_int(to_type: &DataType, options: &SparkCastOptions) -> bool {
+ use DataType::*;
+ match to_type {
+ Boolean | Int8 | Int16 | Int32 | Int64 | Float32 | Float64 | Utf8 =>
true,
+ Decimal128(_, _) => {
+ // incompatible: no overflow check
+ options.allow_incompat
+ }
+ _ => false,
+ }
+}
+
+fn can_cast_from_long(to_type: &DataType, options: &SparkCastOptions) -> bool {
+ use DataType::*;
+ match to_type {
+ Boolean | Int8 | Int16 | Int32 | Int64 | Float32 | Float64 => true,
+ Decimal128(_, _) => {
+ // incompatible: no overflow check
+ options.allow_incompat
+ }
+ _ => false,
+ }
+}
+
+fn can_cast_from_float(to_type: &DataType, _: &SparkCastOptions) -> bool {
+ use DataType::*;
+ matches!(
+ to_type,
+ Boolean | Int8 | Int16 | Int32 | Int64 | Float64 | Decimal128(_, _)
+ )
+}
+
+fn can_cast_from_double(to_type: &DataType, _: &SparkCastOptions) -> bool {
+ use DataType::*;
+ matches!(
+ to_type,
+ Boolean | Int8 | Int16 | Int32 | Int64 | Float32 | Decimal128(_, _)
+ )
+}
+
+fn can_cast_from_decimal(
+ p1: &u8,
+ _s1: &i8,
+ to_type: &DataType,
+ options: &SparkCastOptions,
+) -> bool {
+ use DataType::*;
+ match to_type {
+ Int8 | Int16 | Int32 | Int64 | Float32 | Float64 => true,
+ Decimal128(p2, _) => {
+ if p2 < p1 {
+ // https://github.com/apache/datafusion/issues/13492
+ // Incompatible(Some("Casting to smaller precision is not
supported"))
+ options.allow_incompat
+ } else {
+ true
+ }
+ }
+ _ => false,
+ }
+}
+
#[cfg(test)]
mod test {
use crate::test_common::file_util::get_temp_filename;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]