This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-22315-5b22857036803a5f5f3ca7248b3a0af4615aa7e6 in repository https://gitbox.apache.org/repos/asf/datafusion.git
commit 9aebcea468ac8d47f9874dfea19839127b1bbd9d Author: Zhen Chen <[email protected]> AuthorDate: Thu Jun 4 17:59:53 2026 +0800 fix date_bin overflows scaling extreme Timestamp(Second) source (#22315) ## Which issue does this PR close? - Closes #22211. ## Rationale for this change `date_bin` could panic during planning or constant evaluation when scaling a non-nanosecond source timestamp to nanoseconds overflowed. This change makes that path return a regular error instead of panicking. ## What changes are included in this PR? - Added checked overflow handling for source timestamp scaling in `date_bin`. - Return an error for out-of-range source timestamp conversion instead of panicking. - Preserved existing `NULL` behavior for unrelated out-of-range `date_bin` cases. - Added Rust unit test and sqllogictest coverage for the overflow case. ## Are these changes tested? Yes. Verified with: - `cargo test -p datafusion-functions test_date_bin --lib` - `cargo test -p datafusion-sqllogictest --test sqllogictests date_bin_errors` ## Are there any user-facing changes? Yes. Queries that previously could panic now return a normal error: `Execution error: DATE_BIN source timestamp ... cannot be represented in nanoseconds` --- datafusion/functions/src/datetime/date_bin.rs | 145 +++++++++++++-------- .../sqllogictest/test_files/date_bin_errors.slt | 21 ++- 2 files changed, 111 insertions(+), 55 deletions(-) diff --git a/datafusion/functions/src/datetime/date_bin.rs b/datafusion/functions/src/datetime/date_bin.rs index c69e732c85..38b491e42b 100644 --- a/datafusion/functions/src/datetime/date_bin.rs +++ b/datafusion/functions/src/datetime/date_bin.rs @@ -31,9 +31,12 @@ use arrow::datatypes::{ DataType, Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType, TimeUnit, }; +use arrow::error::ArrowError; use arrow::temporal_conversions::NANOSECONDS_IN_DAY; use datafusion_common::cast::as_primitive_array; -use datafusion_common::{Result, ScalarValue, exec_err, not_impl_err, plan_err}; +use datafusion_common::{ + DataFusionError, Result, ScalarValue, exec_err, not_impl_err, plan_err, +}; use datafusion_expr::TypeSignature::Exact; use datafusion_expr::sort_properties::{ExprProperties, SortProperties}; use datafusion_expr::{ @@ -322,7 +325,7 @@ impl Interval { // return time in nanoseconds that the source timestamp falls into based on the stride and origin fn date_bin_nanos_interval(stride_nanos: i64, source: i64, origin: i64) -> Result<i64> { let time_diff = source.checked_sub(origin).ok_or_else(|| { - arrow::error::ArrowError::InvalidArgumentError(format!( + ArrowError::InvalidArgumentError(format!( "date_bin source timestamp {source} - origin {origin} overflows i64" )) })?; @@ -331,7 +334,7 @@ fn date_bin_nanos_interval(stride_nanos: i64, source: i64, origin: i64) -> Resul let time_delta = compute_distance(time_diff, stride_nanos)?; origin.checked_add(time_delta).ok_or_else(|| { - arrow::error::ArrowError::InvalidArgumentError(format!( + ArrowError::InvalidArgumentError(format!( "date_bin origin {origin} + delta {time_delta} overflows i64" )) .into() @@ -341,12 +344,12 @@ fn date_bin_nanos_interval(stride_nanos: i64, source: i64, origin: i64) -> Resul // distance from origin to bin fn compute_distance(time_diff: i64, stride: i64) -> Result<i64> { let remainder = time_diff.checked_rem(stride).ok_or_else(|| { - arrow::error::ArrowError::InvalidArgumentError(format!( + ArrowError::InvalidArgumentError(format!( "date_bin compute_distance time_diff {time_diff} % stride {stride} overflows i64" )) })?; let time_delta = time_diff.checked_sub(remainder).ok_or_else(|| { - arrow::error::ArrowError::InvalidArgumentError(format!( + ArrowError::InvalidArgumentError(format!( "date_bin compute_distance time_diff {time_diff} - remainder {remainder} overflows i64" )) })?; @@ -354,7 +357,7 @@ fn compute_distance(time_diff: i64, stride: i64) -> Result<i64> { if time_diff < 0 && stride > 1 && time_delta != time_diff { // The origin is later than the source timestamp, round down to the previous bin time_delta.checked_sub(stride).ok_or_else(|| { - arrow::error::ArrowError::InvalidArgumentError(format!( + ArrowError::InvalidArgumentError(format!( "date_bin compute_distance time_delta {time_delta} - stride {stride} overflows i64" )) .into() @@ -594,53 +597,91 @@ fn date_bin_impl( return exec_err!("DATE_BIN stride must be non-zero"); } - fn stride_map_fn<T: ArrowTimestampType>( - origin: i64, - stride: i64, - stride_fn: BinFunction, - ) -> impl Fn(i64) -> Result<i64> { - let scale = match T::UNIT { + fn timestamp_scale<T: ArrowTimestampType>() -> i64 { + match T::UNIT { Nanosecond => 1, Microsecond => NANOS_PER_MICRO, Millisecond => NANOS_PER_MILLI, Second => NANOSECONDS, - }; - move |x: i64| match stride_fn(stride, x * scale, origin) { - Ok(result) => Ok(result / scale), - Err(e) => Err(e), } } + fn timestamp_scale_overflow_error(x: i64) -> DataFusionError { + DataFusionError::Execution(format!( + "DATE_BIN source timestamp {x} cannot be represented in nanoseconds" + )) + } + Ok(match array { ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v, tz_opt)) => { - let apply_stride_fn = - stride_map_fn::<TimestampNanosecondType>(origin, stride, stride_fn); + let scale = timestamp_scale::<TimestampNanosecondType>(); ColumnarValue::Scalar(ScalarValue::TimestampNanosecond( - v.and_then(|val| apply_stride_fn(val).ok()), + match *v { + Some(val) => { + let scaled = val + .checked_mul(scale) + .ok_or_else(|| timestamp_scale_overflow_error(val))?; + match stride_fn(stride, scaled, origin) { + Ok(result) => Some(result / scale), + Err(_) => None, + } + } + None => None, + }, tz_opt.clone(), )) } ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(v, tz_opt)) => { - let apply_stride_fn = - stride_map_fn::<TimestampMicrosecondType>(origin, stride, stride_fn); + let scale = timestamp_scale::<TimestampMicrosecondType>(); ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond( - v.and_then(|val| apply_stride_fn(val).ok()), + match *v { + Some(val) => { + let scaled = val + .checked_mul(scale) + .ok_or_else(|| timestamp_scale_overflow_error(val))?; + match stride_fn(stride, scaled, origin) { + Ok(result) => Some(result / scale), + Err(_) => None, + } + } + None => None, + }, tz_opt.clone(), )) } ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(v, tz_opt)) => { - let apply_stride_fn = - stride_map_fn::<TimestampMillisecondType>(origin, stride, stride_fn); + let scale = timestamp_scale::<TimestampMillisecondType>(); ColumnarValue::Scalar(ScalarValue::TimestampMillisecond( - v.and_then(|val| apply_stride_fn(val).ok()), + match *v { + Some(val) => { + let scaled = val + .checked_mul(scale) + .ok_or_else(|| timestamp_scale_overflow_error(val))?; + match stride_fn(stride, scaled, origin) { + Ok(result) => Some(result / scale), + Err(_) => None, + } + } + None => None, + }, tz_opt.clone(), )) } ColumnarValue::Scalar(ScalarValue::TimestampSecond(v, tz_opt)) => { - let apply_stride_fn = - stride_map_fn::<TimestampSecondType>(origin, stride, stride_fn); + let scale = timestamp_scale::<TimestampSecondType>(); ColumnarValue::Scalar(ScalarValue::TimestampSecond( - v.and_then(|val| apply_stride_fn(val).ok()), + match *v { + Some(val) => { + let scaled = val + .checked_mul(scale) + .ok_or_else(|| timestamp_scale_overflow_error(val))?; + match stride_fn(stride, scaled, origin) { + Ok(result) => Some(result / scale), + Err(_) => None, + } + } + None => None, + }, tz_opt.clone(), )) } @@ -710,20 +751,24 @@ fn date_bin_impl( T: ArrowTimestampType, { let array = as_primitive_array::<T>(array)?; - let scale = match T::UNIT { - Nanosecond => 1, - Microsecond => NANOS_PER_MICRO, - Millisecond => NANOS_PER_MILLI, - Second => NANOSECONDS, - }; - - let result: PrimitiveArray<T> = array.try_unary(|val| { - stride_fn(stride, val * scale, origin) - .map(|binned| binned / scale) - .map_err(|e| { - arrow::error::ArrowError::ComputeError(e.to_string()) - }) - })?; + let scale = timestamp_scale::<T>(); + + let values = array + .iter() + .map(|val| match val { + Some(val) => { + let scaled = val + .checked_mul(scale) + .ok_or_else(|| timestamp_scale_overflow_error(val))?; + Ok(stride_fn(stride, scaled, origin) + .ok() + .map(|binned| binned / scale)) + } + None => Ok(None), + }) + .collect::<Result<Vec<_>>>()?; + + let result = PrimitiveArray::<T>::from_iter(values); let array = result.with_timezone_opt(tz_opt.clone()); Ok(ColumnarValue::Array(Arc::new(array))) @@ -764,9 +809,7 @@ fn date_bin_impl( let nanos = binned_nanos % (NANOSECONDS_IN_DAY); (nanos / NANOS_PER_MILLI) as i32 }) - .map_err(|e| { - arrow::error::ArrowError::ComputeError(e.to_string()) - }) + .map_err(|e| ArrowError::ComputeError(e.to_string())) })?; ColumnarValue::Array(Arc::new(result)) } @@ -784,9 +827,7 @@ fn date_bin_impl( let nanos = binned_nanos % (NANOSECONDS_IN_DAY); (nanos / NANOS_PER_SEC) as i32 }) - .map_err(|e| { - arrow::error::ArrowError::ComputeError(e.to_string()) - }) + .map_err(|e| ArrowError::ComputeError(e.to_string())) })?; ColumnarValue::Array(Arc::new(result)) } @@ -804,9 +845,7 @@ fn date_bin_impl( let nanos = binned_nanos % (NANOSECONDS_IN_DAY); nanos / NANOS_PER_MICRO }) - .map_err(|e| { - arrow::error::ArrowError::ComputeError(e.to_string()) - }) + .map_err(|e| ArrowError::ComputeError(e.to_string())) })?; ColumnarValue::Array(Arc::new(result)) } @@ -821,9 +860,7 @@ fn date_bin_impl( array.try_unary(|x| { stride_fn(stride, x, origin) .map(|binned_nanos| binned_nanos % (NANOSECONDS_IN_DAY)) - .map_err(|e| { - arrow::error::ArrowError::ComputeError(e.to_string()) - }) + .map_err(|e| ArrowError::ComputeError(e.to_string())) })?; ColumnarValue::Array(Arc::new(result)) } diff --git a/datafusion/sqllogictest/test_files/date_bin_errors.slt b/datafusion/sqllogictest/test_files/date_bin_errors.slt index ecb7e27d5f..20408c84ef 100644 --- a/datafusion/sqllogictest/test_files/date_bin_errors.slt +++ b/datafusion/sqllogictest/test_files/date_bin_errors.slt @@ -77,4 +77,23 @@ select date_bin( arrow_cast(-9223372036854775808, 'Timestamp(Nanosecond, None)') ); ---- -NULL \ No newline at end of file +NULL + +# Source timestamp scaling to nanoseconds overflows: should return an error, not panic +query error DataFusion error: Execution error: DATE_BIN source timestamp 9223372036854775807 cannot be represented in nanoseconds +select date_bin( + interval '1 nanosecond', + arrow_cast(9223372036854775807, 'Timestamp(Second, None)'), + timestamp '1970-01-01 00:00:00' +); + +# Source timestamp scaling to nanoseconds overflows in array path: should return an error, not panic +query error DataFusion error: Execution error: DATE_BIN source timestamp 9223372036854775807 cannot be represented in nanoseconds +select date_bin( + interval '1 nanosecond', + ts, + timestamp '1970-01-01 00:00:00' +) +from ( + values (arrow_cast(9223372036854775807, 'Timestamp(Second, None)')) +) as t(ts); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
