This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new ab1d30a3 chore: Move temporal kernels and expressions to spark-expr
crate (#660)
ab1d30a3 is described below
commit ab1d30a3f04e52e5ef97802593200fb9306a0150
Author: Andy Grove <[email protected]>
AuthorDate: Mon Jul 15 11:47:44 2024 -0600
chore: Move temporal kernels and expressions to spark-expr crate (#660)
* Move temporal expressions to spark-expr crate
* reduce public api
* reduce public api
* update imports in benchmarks
* fmt
* remove unused dep
---
native/Cargo.lock | 1 -
native/core/Cargo.toml | 1 -
native/core/benches/cast_from_string.rs | 2 +-
native/core/benches/cast_numeric.rs | 2 +-
.../src/execution/datafusion/expressions/mod.rs | 2 -
.../src/execution/datafusion/expressions/utils.rs | 2 +-
native/core/src/execution/datafusion/planner.rs | 6 +-
native/core/src/execution/kernels/mod.rs | 1 -
.../execution => spark-expr/src}/kernels/mod.rs | 4 --
.../src}/kernels/temporal.rs | 71 +++++++++++-----------
native/spark-expr/src/lib.rs | 6 +-
.../expressions => spark-expr/src}/temporal.rs | 10 ++-
12 files changed, 49 insertions(+), 59 deletions(-)
diff --git a/native/Cargo.lock b/native/Cargo.lock
index f73f2862..649e137f 100644
--- a/native/Cargo.lock
+++ b/native/Cargo.lock
@@ -861,7 +861,6 @@ dependencies = [
"async-trait",
"brotli",
"bytes",
- "chrono",
"crc32fast",
"criterion",
"datafusion",
diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml
index c252fad6..90ead502 100644
--- a/native/core/Cargo.toml
+++ b/native/core/Cargo.toml
@@ -64,7 +64,6 @@ bytes = "1.5.0"
tempfile = "3.8.0"
ahash = { version = "0.8", default-features = false }
itertools = "0.11.0"
-chrono = { workspace = true }
paste = "1.0.14"
datafusion-common = { workspace = true }
datafusion = { workspace = true }
diff --git a/native/core/benches/cast_from_string.rs
b/native/core/benches/cast_from_string.rs
index 9a9ab18c..efc7987c 100644
--- a/native/core/benches/cast_from_string.rs
+++ b/native/core/benches/cast_from_string.rs
@@ -17,8 +17,8 @@
use arrow_array::{builder::StringBuilder, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
-use comet::execution::datafusion::expressions::{cast::Cast, EvalMode};
use criterion::{criterion_group, criterion_main, Criterion};
+use datafusion_comet_spark_expr::{Cast, EvalMode};
use datafusion_physical_expr::{expressions::Column, PhysicalExpr};
use std::sync::Arc;
diff --git a/native/core/benches/cast_numeric.rs
b/native/core/benches/cast_numeric.rs
index 35f24ce5..f9ed1fae 100644
--- a/native/core/benches/cast_numeric.rs
+++ b/native/core/benches/cast_numeric.rs
@@ -17,8 +17,8 @@
use arrow_array::{builder::Int32Builder, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
-use comet::execution::datafusion::expressions::{cast::Cast, EvalMode};
use criterion::{criterion_group, criterion_main, Criterion};
+use datafusion_comet_spark_expr::{Cast, EvalMode};
use datafusion_physical_expr::{expressions::Column, PhysicalExpr};
use std::sync::Arc;
diff --git a/native/core/src/execution/datafusion/expressions/mod.rs
b/native/core/src/execution/datafusion/expressions/mod.rs
index f6fb26b6..3c0a5b26 100644
--- a/native/core/src/execution/datafusion/expressions/mod.rs
+++ b/native/core/src/execution/datafusion/expressions/mod.rs
@@ -18,7 +18,6 @@
//! Native DataFusion expressions
pub mod bitwise_not;
-pub use datafusion_comet_spark_expr::cast;
pub mod checkoverflow;
mod normalize_nan;
pub mod scalar_funcs;
@@ -37,7 +36,6 @@ pub mod stddev;
pub mod strings;
pub mod subquery;
pub mod sum_decimal;
-pub mod temporal;
pub mod unbound;
mod utils;
pub mod variance;
diff --git a/native/core/src/execution/datafusion/expressions/utils.rs
b/native/core/src/execution/datafusion/expressions/utils.rs
index d253b251..540fca86 100644
--- a/native/core/src/execution/datafusion/expressions/utils.rs
+++ b/native/core/src/execution/datafusion/expressions/utils.rs
@@ -16,4 +16,4 @@
// under the License.
// re-export for legacy reasons
-pub use datafusion_comet_spark_expr::utils::{array_with_timezone,
down_cast_any_ref};
+pub use datafusion_comet_spark_expr::utils::down_cast_any_ref;
diff --git a/native/core/src/execution/datafusion/planner.rs
b/native/core/src/execution/datafusion/planner.rs
index 23960c30..7e638305 100644
--- a/native/core/src/execution/datafusion/planner.rs
+++ b/native/core/src/execution/datafusion/planner.rs
@@ -75,7 +75,6 @@ use crate::{
avg_decimal::AvgDecimal,
bitwise_not::BitwiseNotExpr,
bloom_filter_might_contain::BloomFilterMightContain,
- cast::Cast,
checkoverflow::CheckOverflow,
correlation::Correlation,
covariance::Covariance,
@@ -86,7 +85,6 @@ use crate::{
strings::{Contains, EndsWith, Like, StartsWith,
StringSpaceExec, SubstringExec},
subquery::Subquery,
sum_decimal::SumDecimal,
- temporal::{DateTruncExec, HourExec, MinuteExec, SecondExec,
TimestampTruncExec},
unbound::UnboundColumn,
variance::Variance,
NormalizeNaNAndZero,
@@ -107,7 +105,9 @@ use crate::{
};
use super::expressions::{create_named_struct::CreateNamedStruct, EvalMode};
-use datafusion_comet_spark_expr::{Abs, IfExpr};
+use datafusion_comet_spark_expr::{
+ Abs, Cast, DateTruncExec, HourExec, IfExpr, MinuteExec, SecondExec,
TimestampTruncExec,
+};
// For clippy error on type_complexity.
type ExecResult<T> = Result<T, ExecutionError>;
diff --git a/native/core/src/execution/kernels/mod.rs
b/native/core/src/execution/kernels/mod.rs
index 76d4e180..675dcd48 100644
--- a/native/core/src/execution/kernels/mod.rs
+++ b/native/core/src/execution/kernels/mod.rs
@@ -21,4 +21,3 @@ mod hash;
pub use hash::hash;
pub(crate) mod strings;
-pub(crate) mod temporal;
diff --git a/native/core/src/execution/kernels/mod.rs
b/native/spark-expr/src/kernels/mod.rs
similarity index 93%
copy from native/core/src/execution/kernels/mod.rs
copy to native/spark-expr/src/kernels/mod.rs
index 76d4e180..88aa34b1 100644
--- a/native/core/src/execution/kernels/mod.rs
+++ b/native/spark-expr/src/kernels/mod.rs
@@ -17,8 +17,4 @@
//! Kernels
-mod hash;
-pub use hash::hash;
-
-pub(crate) mod strings;
pub(crate) mod temporal;
diff --git a/native/core/src/execution/kernels/temporal.rs
b/native/spark-expr/src/kernels/temporal.rs
similarity index 95%
rename from native/core/src/execution/kernels/temporal.rs
rename to native/spark-expr/src/kernels/temporal.rs
index 9cf35af1..6f2474e8 100644
--- a/native/core/src/execution/kernels/temporal.rs
+++ b/native/spark-expr/src/kernels/temporal.rs
@@ -32,24 +32,18 @@ use arrow_array::{
use arrow_schema::TimeUnit;
-use crate::errors::ExpressionError;
+use crate::SparkError;
// Copied from arrow_arith/temporal.rs
macro_rules! return_compute_error_with {
($msg:expr, $param:expr) => {
- return {
- Err(ExpressionError::ArrowError(format!(
- "{}: {:?}",
- $msg, $param
- )))
- }
+ return { Err(SparkError::Internal(format!("{}: {:?}", $msg, $param))) }
};
}
// The number of days between the beginning of the proleptic gregorian
calendar (0001-01-01)
// and the beginning of the Unix Epoch (1970-01-01)
const DAYS_TO_UNIX_EPOCH: i32 = 719_163;
-const MICROS_TO_UNIX_EPOCH: i64 = 62_167_132_800 * 1_000_000;
// Copied from arrow_arith/temporal.rs with modification to the output datatype
// Transforms a array of NaiveDate to an array of Date32 after applying an
operation
@@ -102,7 +96,7 @@ fn as_timestamp_tz_with_op<A: ArrayAccessor<Item =
T::Native>, T: ArrowTemporalT
mut builder: PrimitiveBuilder<TimestampMicrosecondType>,
tz: &str,
op: F,
-) -> Result<TimestampMicrosecondArray, ExpressionError>
+) -> Result<TimestampMicrosecondArray, SparkError>
where
F: Fn(DateTime<Tz>) -> i64,
i64: From<T::Native>,
@@ -113,7 +107,7 @@ where
Some(value) => match as_datetime_with_timezone::<T>(value.into(),
tz) {
Some(time) => builder.append_value(op(time)),
_ => {
- return Err(ExpressionError::ArrowError(
+ return Err(SparkError::Internal(
"Unable to read value as datetime".to_string(),
));
}
@@ -129,7 +123,7 @@ fn as_timestamp_tz_with_op_single<T: ArrowTemporalType, F>(
builder: &mut PrimitiveBuilder<TimestampMicrosecondType>,
tz: &Tz,
op: F,
-) -> Result<(), ExpressionError>
+) -> Result<(), SparkError>
where
F: Fn(DateTime<Tz>) -> i64,
i64: From<T::Native>,
@@ -138,7 +132,7 @@ where
Some(value) => match as_datetime_with_timezone::<T>(value.into(), *tz)
{
Some(time) => builder.append_value(op(time)),
_ => {
- return Err(ExpressionError::ArrowError(
+ return Err(SparkError::Internal(
"Unable to read value as datetime".to_string(),
));
}
@@ -256,7 +250,7 @@ fn trunc_date_to_microsec<T: Timelike>(dt: T) -> Option<T> {
/// array is an array of Date32 values. The array may be a dictionary array.
///
/// format is a scalar string specifying the format to apply to the
timestamp value.
-pub fn date_trunc_dyn(array: &dyn Array, format: String) -> Result<ArrayRef,
ExpressionError> {
+pub(crate) fn date_trunc_dyn(array: &dyn Array, format: String) ->
Result<ArrayRef, SparkError> {
match array.data_type().clone() {
DataType::Dictionary(_, _) => {
downcast_dictionary_array!(
@@ -279,10 +273,10 @@ pub fn date_trunc_dyn(array: &dyn Array, format: String)
-> Result<ArrayRef, Exp
}
}
-pub fn date_trunc<T>(
+pub(crate) fn date_trunc<T>(
array: &PrimitiveArray<T>,
format: String,
-) -> Result<Date32Array, ExpressionError>
+) -> Result<Date32Array, SparkError>
where
T: ArrowTemporalType + ArrowNumericType,
i64: From<T::Native>,
@@ -311,7 +305,7 @@ where
builder,
|dt| as_days_from_unix_epoch(trunc_date_to_week(dt)),
)),
- _ => Err(ExpressionError::ArrowError(format!(
+ _ => Err(SparkError::Internal(format!(
"Unsupported format: {:?} for function 'date_trunc'",
format
))),
@@ -331,10 +325,10 @@ where
///
/// format is an array of strings specifying the format to apply to the
corresponding date value.
/// The array may be a dictionary array.
-pub fn date_trunc_array_fmt_dyn(
+pub(crate) fn date_trunc_array_fmt_dyn(
array: &dyn Array,
formats: &dyn Array,
-) -> Result<ArrayRef, ExpressionError> {
+) -> Result<ArrayRef, SparkError> {
match (array.data_type().clone(), formats.data_type().clone()) {
(DataType::Dictionary(_, v), DataType::Dictionary(_, f)) => {
if !matches!(*v, DataType::Date32) {
@@ -403,7 +397,7 @@ pub fn date_trunc_array_fmt_dyn(
.expect("Unexpected value type in formats"),
)
.map(|a| Arc::new(a) as ArrayRef),
- (dt, fmt) => Err(ExpressionError::ArrowError(format!(
+ (dt, fmt) => Err(SparkError::Internal(format!(
"Unsupported datatype: {:}, format: {:?} for function
'date_trunc'",
dt, fmt
))),
@@ -434,7 +428,7 @@ macro_rules! date_trunc_array_fmt_helper {
"WEEK" => Ok(as_datetime_with_op_single(val, &mut
builder, |dt| {
as_days_from_unix_epoch(trunc_date_to_week(dt))
})),
- _ => Err(ExpressionError::ArrowError(format!(
+ _ => Err(SparkError::Internal(format!(
"Unsupported format: {:?} for function
'date_trunc'",
$formats.value(index)
))),
@@ -454,7 +448,7 @@ macro_rules! date_trunc_array_fmt_helper {
fn date_trunc_array_fmt_plain_plain(
array: &Date32Array,
formats: &StringArray,
-) -> Result<Date32Array, ExpressionError>
+) -> Result<Date32Array, SparkError>
where
{
let data_type = array.data_type();
@@ -464,7 +458,7 @@ where
fn date_trunc_array_fmt_plain_dict<K>(
array: &Date32Array,
formats: &TypedDictionaryArray<K, StringArray>,
-) -> Result<Date32Array, ExpressionError>
+) -> Result<Date32Array, SparkError>
where
K: ArrowDictionaryKeyType,
{
@@ -475,7 +469,7 @@ where
fn date_trunc_array_fmt_dict_plain<K>(
array: &TypedDictionaryArray<K, Date32Array>,
formats: &StringArray,
-) -> Result<Date32Array, ExpressionError>
+) -> Result<Date32Array, SparkError>
where
K: ArrowDictionaryKeyType,
{
@@ -486,7 +480,7 @@ where
fn date_trunc_array_fmt_dict_dict<K, F>(
array: &TypedDictionaryArray<K, Date32Array>,
formats: &TypedDictionaryArray<F, StringArray>,
-) -> Result<Date32Array, ExpressionError>
+) -> Result<Date32Array, SparkError>
where
K: ArrowDictionaryKeyType,
F: ArrowDictionaryKeyType,
@@ -503,7 +497,10 @@ where
/// timezone or no timezone. The array may be a dictionary array.
///
/// format is a scalar string specifying the format to apply to the
timestamp value.
-pub fn timestamp_trunc_dyn(array: &dyn Array, format: String) ->
Result<ArrayRef, ExpressionError> {
+pub(crate) fn timestamp_trunc_dyn(
+ array: &dyn Array,
+ format: String,
+) -> Result<ArrayRef, SparkError> {
match array.data_type().clone() {
DataType::Dictionary(_, _) => {
downcast_dictionary_array!(
@@ -526,10 +523,10 @@ pub fn timestamp_trunc_dyn(array: &dyn Array, format:
String) -> Result<ArrayRef
}
}
-pub fn timestamp_trunc<T>(
+pub(crate) fn timestamp_trunc<T>(
array: &PrimitiveArray<T>,
format: String,
-) -> Result<TimestampMicrosecondArray, ExpressionError>
+) -> Result<TimestampMicrosecondArray, SparkError>
where
T: ArrowTemporalType + ArrowNumericType,
i64: From<T::Native>,
@@ -589,7 +586,7 @@ where
as_micros_from_unix_epoch_utc(trunc_date_to_microsec(dt))
})
}
- _ => Err(ExpressionError::ArrowError(format!(
+ _ => Err(SparkError::Internal(format!(
"Unsupported format: {:?} for function 'timestamp_trunc'",
format
))),
@@ -611,10 +608,10 @@ where
///
/// format is an array of strings specifying the format to apply to the
corresponding timestamp
/// value. The array may be a dictionary array.
-pub fn timestamp_trunc_array_fmt_dyn(
+pub(crate) fn timestamp_trunc_array_fmt_dyn(
array: &dyn Array,
formats: &dyn Array,
-) -> Result<ArrayRef, ExpressionError> {
+) -> Result<ArrayRef, SparkError> {
match (array.data_type().clone(), formats.data_type().clone()) {
(DataType::Dictionary(_, _), DataType::Dictionary(_, _)) => {
downcast_dictionary_array!(
@@ -669,7 +666,7 @@ pub fn timestamp_trunc_array_fmt_dyn(
dt => return_compute_error_with!("timestamp_trunc does not
support", dt),
)
}
- (dt, fmt) => Err(ExpressionError::ArrowError(format!(
+ (dt, fmt) => Err(SparkError::Internal(format!(
"Unsupported datatype: {:}, format: {:?} for function
'timestamp_trunc'",
dt, fmt
))),
@@ -740,7 +737,7 @@ macro_rules! timestamp_trunc_array_fmt_helper {
as_micros_from_unix_epoch_utc(trunc_date_to_microsec(dt))
})
}
- _ => Err(ExpressionError::ArrowError(format!(
+ _ => Err(SparkError::Internal(format!(
"Unsupported format: {:?} for function
'timestamp_trunc'",
$formats.value(index)
))),
@@ -762,7 +759,7 @@ macro_rules! timestamp_trunc_array_fmt_helper {
fn timestamp_trunc_array_fmt_plain_plain<T>(
array: &PrimitiveArray<T>,
formats: &StringArray,
-) -> Result<TimestampMicrosecondArray, ExpressionError>
+) -> Result<TimestampMicrosecondArray, SparkError>
where
T: ArrowTemporalType + ArrowNumericType,
i64: From<T::Native>,
@@ -773,7 +770,7 @@ where
fn timestamp_trunc_array_fmt_plain_dict<T, K>(
array: &PrimitiveArray<T>,
formats: &TypedDictionaryArray<K, StringArray>,
-) -> Result<TimestampMicrosecondArray, ExpressionError>
+) -> Result<TimestampMicrosecondArray, SparkError>
where
T: ArrowTemporalType + ArrowNumericType,
i64: From<T::Native>,
@@ -786,7 +783,7 @@ where
fn timestamp_trunc_array_fmt_dict_plain<T, K>(
array: &TypedDictionaryArray<K, PrimitiveArray<T>>,
formats: &StringArray,
-) -> Result<TimestampMicrosecondArray, ExpressionError>
+) -> Result<TimestampMicrosecondArray, SparkError>
where
T: ArrowTemporalType + ArrowNumericType,
i64: From<T::Native>,
@@ -799,7 +796,7 @@ where
fn timestamp_trunc_array_fmt_dict_dict<T, K, F>(
array: &TypedDictionaryArray<K, PrimitiveArray<T>>,
formats: &TypedDictionaryArray<F, StringArray>,
-) -> Result<TimestampMicrosecondArray, ExpressionError>
+) -> Result<TimestampMicrosecondArray, SparkError>
where
T: ArrowTemporalType + ArrowNumericType,
i64: From<T::Native>,
@@ -812,7 +809,7 @@ where
#[cfg(test)]
mod tests {
- use crate::execution::kernels::temporal::{
+ use crate::kernels::temporal::{
date_trunc, date_trunc_array_fmt_dyn, timestamp_trunc,
timestamp_trunc_array_fmt_dyn,
};
use arrow_array::{
diff --git a/native/spark-expr/src/lib.rs b/native/spark-expr/src/lib.rs
index 3c726f52..5168e0e8 100644
--- a/native/spark-expr/src/lib.rs
+++ b/native/spark-expr/src/lib.rs
@@ -16,16 +16,20 @@
// under the License.
mod abs;
-pub mod cast;
+mod cast;
mod error;
mod if_expr;
+mod kernels;
+mod temporal;
pub mod timezone;
pub mod utils;
pub use abs::Abs;
+pub use cast::Cast;
pub use error::{SparkError, SparkResult};
pub use if_expr::IfExpr;
+pub use temporal::{DateTruncExec, HourExec, MinuteExec, SecondExec,
TimestampTruncExec};
/// Spark supports three evaluation modes when evaluating expressions, which
affect
/// the behavior when processing input values that are invalid or would result
in an
diff --git a/native/core/src/execution/datafusion/expressions/temporal.rs
b/native/spark-expr/src/temporal.rs
similarity index 98%
rename from native/core/src/execution/datafusion/expressions/temporal.rs
rename to native/spark-expr/src/temporal.rs
index 69fbb791..ea30d338 100644
--- a/native/core/src/execution/datafusion/expressions/temporal.rs
+++ b/native/spark-expr/src/temporal.rs
@@ -31,12 +31,10 @@ use datafusion::logical_expr::ColumnarValue;
use datafusion_common::{DataFusionError, ScalarValue::Utf8};
use datafusion_physical_expr::PhysicalExpr;
-use crate::execution::{
- datafusion::expressions::utils::{array_with_timezone, down_cast_any_ref},
- kernels::temporal::{
- date_trunc_array_fmt_dyn, date_trunc_dyn,
timestamp_trunc_array_fmt_dyn,
- timestamp_trunc_dyn,
- },
+use crate::utils::{array_with_timezone, down_cast_any_ref};
+
+use crate::kernels::temporal::{
+ date_trunc_array_fmt_dyn, date_trunc_dyn, timestamp_trunc_array_fmt_dyn,
timestamp_trunc_dyn,
};
#[derive(Debug, Hash)]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]