This is an automated email from the ASF dual-hosted git repository.
jayzhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 66180fa24e Migrate code from invoke to invoke_batch. (#13345)
66180fa24e is described below
commit 66180fa24ec889d92e771649738c2b2d362907df
Author: irenjj <[email protected]>
AuthorDate: Thu Nov 14 19:05:43 2024 +0800
Migrate code from invoke to invoke_batch. (#13345)
* migrate UDF invoke to invoke_batch
* fix
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion/functions/src/datetime/date_bin.rs | 281 ++++++++++++++----------
datafusion/functions/src/datetime/date_trunc.rs | 28 ++-
datafusion/functions/src/datetime/make_date.rs | 102 +++++----
datafusion/functions/src/datetime/to_char.rs | 60 +++--
datafusion/functions/src/datetime/to_date.rs | 60 ++---
datafusion/functions/src/string/concat.rs | 3 +-
datafusion/functions/src/string/concat_ws.rs | 6 +-
datafusion/functions/src/string/contains.rs | 3 +-
datafusion/functions/src/string/lower.rs | 4 +-
datafusion/functions/src/string/upper.rs | 4 +-
10 files changed, 322 insertions(+), 229 deletions(-)
diff --git a/datafusion/functions/src/datetime/date_bin.rs
b/datafusion/functions/src/datetime/date_bin.rs
index 82481f9fff..671967a893 100644
--- a/datafusion/functions/src/datetime/date_bin.rs
+++ b/datafusion/functions/src/datetime/date_bin.rs
@@ -504,7 +504,7 @@ mod tests {
use crate::datetime::date_bin::{date_bin_nanos_interval, DateBinFunc};
use arrow::array::types::TimestampNanosecondType;
- use arrow::array::{IntervalDayTimeArray, TimestampNanosecondArray};
+ use arrow::array::{Array, IntervalDayTimeArray, TimestampNanosecondArray};
use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos;
use arrow::datatypes::{DataType, TimeUnit};
@@ -515,50 +515,68 @@ mod tests {
use chrono::TimeDelta;
#[test]
- #[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
fn test_date_bin() {
- let res = DateBinFunc::new().invoke(&[
-
ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
- days: 0,
- milliseconds: 1,
- }))),
- ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1),
None)),
- ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1),
None)),
- ]);
+ let res = DateBinFunc::new().invoke_batch(
+ &[
+ ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
+ IntervalDayTime {
+ days: 0,
+ milliseconds: 1,
+ },
+ ))),
+
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
+
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
+ ],
+ 1,
+ );
assert!(res.is_ok());
let timestamps =
Arc::new((1..6).map(Some).collect::<TimestampNanosecondArray>());
- let res = DateBinFunc::new().invoke(&[
-
ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
- days: 0,
- milliseconds: 1,
- }))),
- ColumnarValue::Array(timestamps),
- ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1),
None)),
- ]);
+ let batch_size = timestamps.len();
+ let res = DateBinFunc::new().invoke_batch(
+ &[
+ ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
+ IntervalDayTime {
+ days: 0,
+ milliseconds: 1,
+ },
+ ))),
+ ColumnarValue::Array(timestamps),
+
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
+ ],
+ batch_size,
+ );
assert!(res.is_ok());
- let res = DateBinFunc::new().invoke(&[
-
ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
- days: 0,
- milliseconds: 1,
- }))),
- ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1),
None)),
- ]);
+ let res = DateBinFunc::new().invoke_batch(
+ &[
+ ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
+ IntervalDayTime {
+ days: 0,
+ milliseconds: 1,
+ },
+ ))),
+
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
+ ],
+ 1,
+ );
assert!(res.is_ok());
// stride supports month-day-nano
- let res = DateBinFunc::new().invoke(&[
- ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(
- IntervalMonthDayNano {
- months: 0,
- days: 0,
- nanoseconds: 1,
- },
- ))),
- ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1),
None)),
- ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1),
None)),
- ]);
+ let res = DateBinFunc::new().invoke_batch(
+ &[
+ ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(
+ IntervalMonthDayNano {
+ months: 0,
+ days: 0,
+ nanoseconds: 1,
+ },
+ ))),
+
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
+
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
+ ],
+ 1,
+ );
assert!(res.is_ok());
//
@@ -566,99 +584,129 @@ mod tests {
//
// invalid number of arguments
- let res = DateBinFunc::new().invoke(&[ColumnarValue::Scalar(
- ScalarValue::IntervalDayTime(Some(IntervalDayTime {
- days: 0,
- milliseconds: 1,
- })),
- )]);
+ let res = DateBinFunc::new().invoke_batch(
+ &[ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
+ IntervalDayTime {
+ days: 0,
+ milliseconds: 1,
+ },
+ )))],
+ 1,
+ );
assert_eq!(
res.err().unwrap().strip_backtrace(),
"Execution error: DATE_BIN expected two or three arguments"
);
// stride: invalid type
- let res = DateBinFunc::new().invoke(&[
- ColumnarValue::Scalar(ScalarValue::IntervalYearMonth(Some(1))),
- ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1),
None)),
- ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1),
None)),
- ]);
+ let res = DateBinFunc::new().invoke_batch(
+ &[
+ ColumnarValue::Scalar(ScalarValue::IntervalYearMonth(Some(1))),
+
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
+
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
+ ],
+ 1,
+ );
assert_eq!(
res.err().unwrap().strip_backtrace(),
"Execution error: DATE_BIN expects stride argument to be an
INTERVAL but got Interval(YearMonth)"
);
// stride: invalid value
- let res = DateBinFunc::new().invoke(&[
-
ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
- days: 0,
- milliseconds: 0,
- }))),
- ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1),
None)),
- ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1),
None)),
- ]);
+ let res = DateBinFunc::new().invoke_batch(
+ &[
+ ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
+ IntervalDayTime {
+ days: 0,
+ milliseconds: 0,
+ },
+ ))),
+
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
+
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
+ ],
+ 1,
+ );
assert_eq!(
res.err().unwrap().strip_backtrace(),
"Execution error: DATE_BIN stride must be non-zero"
);
// stride: overflow of day-time interval
- let res = DateBinFunc::new().invoke(&[
- ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
- IntervalDayTime::MAX,
- ))),
- ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1),
None)),
- ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1),
None)),
- ]);
+ let res = DateBinFunc::new().invoke_batch(
+ &[
+ ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
+ IntervalDayTime::MAX,
+ ))),
+
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
+
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
+ ],
+ 1,
+ );
assert_eq!(
res.err().unwrap().strip_backtrace(),
"Execution error: DATE_BIN stride argument is too large"
);
// stride: overflow of month-day-nano interval
- let res = DateBinFunc::new().invoke(&[
- ColumnarValue::Scalar(ScalarValue::new_interval_mdn(0, i32::MAX,
1)),
- ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1),
None)),
- ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1),
None)),
- ]);
+ let res = DateBinFunc::new().invoke_batch(
+ &[
+ ColumnarValue::Scalar(ScalarValue::new_interval_mdn(0,
i32::MAX, 1)),
+
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
+
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
+ ],
+ 1,
+ );
assert_eq!(
res.err().unwrap().strip_backtrace(),
"Execution error: DATE_BIN stride argument is too large"
);
// stride: month intervals
- let res = DateBinFunc::new().invoke(&[
- ColumnarValue::Scalar(ScalarValue::new_interval_mdn(1, 1, 1)),
- ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1),
None)),
- ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1),
None)),
- ]);
+ let res = DateBinFunc::new().invoke_batch(
+ &[
+ ColumnarValue::Scalar(ScalarValue::new_interval_mdn(1, 1, 1)),
+
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
+
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
+ ],
+ 1,
+ );
assert_eq!(
res.err().unwrap().strip_backtrace(),
"This feature is not implemented: DATE_BIN stride does not support
combination of month, day and nanosecond intervals"
);
// origin: invalid type
- let res = DateBinFunc::new().invoke(&[
-
ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
- days: 0,
- milliseconds: 1,
- }))),
- ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1),
None)),
- ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(1),
None)),
- ]);
+ let res = DateBinFunc::new().invoke_batch(
+ &[
+ ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
+ IntervalDayTime {
+ days: 0,
+ milliseconds: 1,
+ },
+ ))),
+
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
+
ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(1), None)),
+ ],
+ 1,
+ );
assert_eq!(
res.err().unwrap().strip_backtrace(),
"Execution error: DATE_BIN expects origin argument to be a
TIMESTAMP with nanosecond precision but got Timestamp(Microsecond, None)"
);
- let res = DateBinFunc::new().invoke(&[
-
ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
- days: 0,
- milliseconds: 1,
- }))),
- ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(1),
None)),
- ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1),
None)),
- ]);
+ let res = DateBinFunc::new().invoke_batch(
+ &[
+ ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
+ IntervalDayTime {
+ days: 0,
+ milliseconds: 1,
+ },
+ ))),
+
ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(1), None)),
+
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
+ ],
+ 1,
+ );
assert!(res.is_ok());
// unsupported array type for stride
@@ -672,11 +720,15 @@ mod tests {
})
.collect::<IntervalDayTimeArray>(),
);
- let res = DateBinFunc::new().invoke(&[
- ColumnarValue::Array(intervals),
- ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1),
None)),
- ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1),
None)),
- ]);
+ let batch_size = intervals.len();
+ let res = DateBinFunc::new().invoke_batch(
+ &[
+ ColumnarValue::Array(intervals),
+
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
+
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
+ ],
+ batch_size,
+ );
assert_eq!(
res.err().unwrap().strip_backtrace(),
"This feature is not implemented: DATE_BIN only supports literal
values for the stride argument, not arrays"
@@ -684,14 +736,20 @@ mod tests {
// unsupported array type for origin
let timestamps =
Arc::new((1..6).map(Some).collect::<TimestampNanosecondArray>());
- let res = DateBinFunc::new().invoke(&[
-
ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
- days: 0,
- milliseconds: 1,
- }))),
- ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1),
None)),
- ColumnarValue::Array(timestamps),
- ]);
+ let batch_size = timestamps.len();
+ let res = DateBinFunc::new().invoke_batch(
+ &[
+ ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
+ IntervalDayTime {
+ days: 0,
+ milliseconds: 1,
+ },
+ ))),
+
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
+ ColumnarValue::Array(timestamps),
+ ],
+ batch_size,
+ );
assert_eq!(
res.err().unwrap().strip_backtrace(),
"This feature is not implemented: DATE_BIN only supports literal
values for the origin argument, not arrays"
@@ -806,16 +864,19 @@ mod tests {
.map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
.collect::<TimestampNanosecondArray>()
.with_timezone_opt(tz_opt.clone());
- #[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
+ let batch_size = input.len();
let result = DateBinFunc::new()
- .invoke(&[
- ColumnarValue::Scalar(ScalarValue::new_interval_dt(1,
0)),
- ColumnarValue::Array(Arc::new(input)),
- ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
- Some(string_to_timestamp_nanos(origin).unwrap()),
- tz_opt.clone(),
- )),
- ])
+ .invoke_batch(
+ &[
+
ColumnarValue::Scalar(ScalarValue::new_interval_dt(1, 0)),
+ ColumnarValue::Array(Arc::new(input)),
+
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
+
Some(string_to_timestamp_nanos(origin).unwrap()),
+ tz_opt.clone(),
+ )),
+ ],
+ batch_size,
+ )
.unwrap();
if let ColumnarValue::Array(result) = result {
assert_eq!(
diff --git a/datafusion/functions/src/datetime/date_trunc.rs
b/datafusion/functions/src/datetime/date_trunc.rs
index f8abef601f..5ec308ef9c 100644
--- a/datafusion/functions/src/datetime/date_trunc.rs
+++ b/datafusion/functions/src/datetime/date_trunc.rs
@@ -484,7 +484,7 @@ mod tests {
use arrow::array::cast::as_primitive_array;
use arrow::array::types::TimestampNanosecondType;
- use arrow::array::TimestampNanosecondArray;
+ use arrow::array::{Array, TimestampNanosecondArray};
use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos;
use arrow::datatypes::{DataType, TimeUnit};
use datafusion_common::ScalarValue;
@@ -724,12 +724,15 @@ mod tests {
.map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
.collect::<TimestampNanosecondArray>()
.with_timezone_opt(tz_opt.clone());
- #[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
+ let batch_size = input.len();
let result = DateTruncFunc::new()
- .invoke(&[
- ColumnarValue::Scalar(ScalarValue::from("day")),
- ColumnarValue::Array(Arc::new(input)),
- ])
+ .invoke_batch(
+ &[
+ ColumnarValue::Scalar(ScalarValue::from("day")),
+ ColumnarValue::Array(Arc::new(input)),
+ ],
+ batch_size,
+ )
.unwrap();
if let ColumnarValue::Array(result) = result {
assert_eq!(
@@ -883,12 +886,15 @@ mod tests {
.map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
.collect::<TimestampNanosecondArray>()
.with_timezone_opt(tz_opt.clone());
- #[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
+ let batch_size = input.len();
let result = DateTruncFunc::new()
- .invoke(&[
- ColumnarValue::Scalar(ScalarValue::from("hour")),
- ColumnarValue::Array(Arc::new(input)),
- ])
+ .invoke_batch(
+ &[
+ ColumnarValue::Scalar(ScalarValue::from("hour")),
+ ColumnarValue::Array(Arc::new(input)),
+ ],
+ batch_size,
+ )
.unwrap();
if let ColumnarValue::Array(result) = result {
assert_eq!(
diff --git a/datafusion/functions/src/datetime/make_date.rs
b/datafusion/functions/src/datetime/make_date.rs
index 6b246cb088..a13511f333 100644
--- a/datafusion/functions/src/datetime/make_date.rs
+++ b/datafusion/functions/src/datetime/make_date.rs
@@ -234,13 +234,15 @@ mod tests {
#[test]
fn test_make_date() {
- #[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let res = MakeDateFunc::new()
- .invoke(&[
- ColumnarValue::Scalar(ScalarValue::Int32(Some(2024))),
- ColumnarValue::Scalar(ScalarValue::Int64(Some(1))),
- ColumnarValue::Scalar(ScalarValue::UInt32(Some(14))),
- ])
+ .invoke_batch(
+ &[
+ ColumnarValue::Scalar(ScalarValue::Int32(Some(2024))),
+ ColumnarValue::Scalar(ScalarValue::Int64(Some(1))),
+ ColumnarValue::Scalar(ScalarValue::UInt32(Some(14))),
+ ],
+ 1,
+ )
.expect("that make_date parsed values without error");
if let ColumnarValue::Scalar(ScalarValue::Date32(date)) = res {
@@ -249,13 +251,15 @@ mod tests {
panic!("Expected a scalar value")
}
- #[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let res = MakeDateFunc::new()
- .invoke(&[
- ColumnarValue::Scalar(ScalarValue::Int64(Some(2024))),
- ColumnarValue::Scalar(ScalarValue::UInt64(Some(1))),
- ColumnarValue::Scalar(ScalarValue::UInt32(Some(14))),
- ])
+ .invoke_batch(
+ &[
+ ColumnarValue::Scalar(ScalarValue::Int64(Some(2024))),
+ ColumnarValue::Scalar(ScalarValue::UInt64(Some(1))),
+ ColumnarValue::Scalar(ScalarValue::UInt32(Some(14))),
+ ],
+ 1,
+ )
.expect("that make_date parsed values without error");
if let ColumnarValue::Scalar(ScalarValue::Date32(date)) = res {
@@ -264,13 +268,15 @@ mod tests {
panic!("Expected a scalar value")
}
- #[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let res = MakeDateFunc::new()
- .invoke(&[
-
ColumnarValue::Scalar(ScalarValue::Utf8(Some("2024".to_string()))),
-
ColumnarValue::Scalar(ScalarValue::LargeUtf8(Some("1".to_string()))),
-
ColumnarValue::Scalar(ScalarValue::Utf8(Some("14".to_string()))),
- ])
+ .invoke_batch(
+ &[
+
ColumnarValue::Scalar(ScalarValue::Utf8(Some("2024".to_string()))),
+
ColumnarValue::Scalar(ScalarValue::LargeUtf8(Some("1".to_string()))),
+
ColumnarValue::Scalar(ScalarValue::Utf8(Some("14".to_string()))),
+ ],
+ 1,
+ )
.expect("that make_date parsed values without error");
if let ColumnarValue::Scalar(ScalarValue::Date32(date)) = res {
@@ -282,13 +288,16 @@ mod tests {
let years = Arc::new((2021..2025).map(Some).collect::<Int64Array>());
let months = Arc::new((1..5).map(Some).collect::<Int32Array>());
let days = Arc::new((11..15).map(Some).collect::<UInt32Array>());
- #[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
+ let batch_size = years.len();
let res = MakeDateFunc::new()
- .invoke(&[
- ColumnarValue::Array(years),
- ColumnarValue::Array(months),
- ColumnarValue::Array(days),
- ])
+ .invoke_batch(
+ &[
+ ColumnarValue::Array(years),
+ ColumnarValue::Array(months),
+ ColumnarValue::Array(days),
+ ],
+ batch_size,
+ )
.expect("that make_date parsed values without error");
if let ColumnarValue::Array(array) = res {
@@ -308,45 +317,50 @@ mod tests {
//
// invalid number of arguments
- #[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let res = MakeDateFunc::new()
- .invoke(&[ColumnarValue::Scalar(ScalarValue::Int32(Some(1)))]);
+
.invoke_batch(&[ColumnarValue::Scalar(ScalarValue::Int32(Some(1)))], 1);
assert_eq!(
res.err().unwrap().strip_backtrace(),
"Execution error: make_date function requires 3 arguments, got 1"
);
// invalid type
- #[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
- let res = MakeDateFunc::new().invoke(&[
- ColumnarValue::Scalar(ScalarValue::IntervalYearMonth(Some(1))),
- ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1),
None)),
- ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1),
None)),
- ]);
+ let res = MakeDateFunc::new().invoke_batch(
+ &[
+ ColumnarValue::Scalar(ScalarValue::IntervalYearMonth(Some(1))),
+
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
+
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
+ ],
+ 1,
+ );
assert_eq!(
res.err().unwrap().strip_backtrace(),
"Arrow error: Cast error: Casting from Interval(YearMonth) to
Int32 not supported"
);
// overflow of month
- #[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
- let res = MakeDateFunc::new().invoke(&[
- ColumnarValue::Scalar(ScalarValue::Int32(Some(2023))),
- ColumnarValue::Scalar(ScalarValue::UInt64(Some(u64::MAX))),
- ColumnarValue::Scalar(ScalarValue::Int32(Some(22))),
- ]);
+ let res = MakeDateFunc::new().invoke_batch(
+ &[
+ ColumnarValue::Scalar(ScalarValue::Int32(Some(2023))),
+ ColumnarValue::Scalar(ScalarValue::UInt64(Some(u64::MAX))),
+ ColumnarValue::Scalar(ScalarValue::Int32(Some(22))),
+ ],
+ 1,
+ );
assert_eq!(
res.err().unwrap().strip_backtrace(),
"Arrow error: Cast error: Can't cast value 18446744073709551615 to
type Int32"
);
// overflow of day
- #[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
- let res = MakeDateFunc::new().invoke(&[
- ColumnarValue::Scalar(ScalarValue::Int32(Some(2023))),
- ColumnarValue::Scalar(ScalarValue::Int32(Some(22))),
- ColumnarValue::Scalar(ScalarValue::UInt32(Some(u32::MAX))),
- ]);
+ let res = MakeDateFunc::new().invoke_batch(
+ &[
+ ColumnarValue::Scalar(ScalarValue::Int32(Some(2023))),
+ ColumnarValue::Scalar(ScalarValue::Int32(Some(22))),
+ ColumnarValue::Scalar(ScalarValue::UInt32(Some(u32::MAX))),
+ ],
+ 1,
+ );
assert_eq!(
res.err().unwrap().strip_backtrace(),
"Arrow error: Cast error: Can't cast value 4294967295 to type
Int32"
diff --git a/datafusion/functions/src/datetime/to_char.rs
b/datafusion/functions/src/datetime/to_char.rs
index ef5d6a4f69..dd4ae7b846 100644
--- a/datafusion/functions/src/datetime/to_char.rs
+++ b/datafusion/functions/src/datetime/to_char.rs
@@ -384,9 +384,11 @@ mod tests {
];
for (value, format, expected) in scalar_data {
- #[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let result = ToCharFunc::new()
- .invoke(&[ColumnarValue::Scalar(value),
ColumnarValue::Scalar(format)])
+ .invoke_batch(
+ &[ColumnarValue::Scalar(value),
ColumnarValue::Scalar(format)],
+ 1,
+ )
.expect("that to_char parsed values without error");
if let ColumnarValue::Scalar(ScalarValue::Utf8(date)) = result {
@@ -459,12 +461,15 @@ mod tests {
];
for (value, format, expected) in scalar_array_data {
- #[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
+ let batch_size = format.len();
let result = ToCharFunc::new()
- .invoke(&[
- ColumnarValue::Scalar(value),
- ColumnarValue::Array(Arc::new(format) as ArrayRef),
- ])
+ .invoke_batch(
+ &[
+ ColumnarValue::Scalar(value),
+ ColumnarValue::Array(Arc::new(format) as ArrayRef),
+ ],
+ batch_size,
+ )
.expect("that to_char parsed values without error");
if let ColumnarValue::Scalar(ScalarValue::Utf8(date)) = result {
@@ -585,12 +590,15 @@ mod tests {
];
for (value, format, expected) in array_scalar_data {
- #[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
+ let batch_size = value.len();
let result = ToCharFunc::new()
- .invoke(&[
- ColumnarValue::Array(value as ArrayRef),
- ColumnarValue::Scalar(format),
- ])
+ .invoke_batch(
+ &[
+ ColumnarValue::Array(value as ArrayRef),
+ ColumnarValue::Scalar(format),
+ ],
+ batch_size,
+ )
.expect("that to_char parsed values without error");
if let ColumnarValue::Array(result) = result {
@@ -602,12 +610,15 @@ mod tests {
}
for (value, format, expected) in array_array_data {
- #[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
+ let batch_size = value.len();
let result = ToCharFunc::new()
- .invoke(&[
- ColumnarValue::Array(value),
- ColumnarValue::Array(Arc::new(format) as ArrayRef),
- ])
+ .invoke_batch(
+ &[
+ ColumnarValue::Array(value),
+ ColumnarValue::Array(Arc::new(format) as ArrayRef),
+ ],
+ batch_size,
+ )
.expect("that to_char parsed values without error");
if let ColumnarValue::Array(result) = result {
@@ -623,20 +634,21 @@ mod tests {
//
// invalid number of arguments
- #[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let result = ToCharFunc::new()
- .invoke(&[ColumnarValue::Scalar(ScalarValue::Int32(Some(1)))]);
+
.invoke_batch(&[ColumnarValue::Scalar(ScalarValue::Int32(Some(1)))], 1);
assert_eq!(
result.err().unwrap().strip_backtrace(),
"Execution error: to_char function requires 2 arguments, got 1"
);
// invalid type
- #[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
- let result = ToCharFunc::new().invoke(&[
- ColumnarValue::Scalar(ScalarValue::Int32(Some(1))),
- ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1),
None)),
- ]);
+ let result = ToCharFunc::new().invoke_batch(
+ &[
+ ColumnarValue::Scalar(ScalarValue::Int32(Some(1))),
+
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
+ ],
+ 1,
+ );
assert_eq!(
result.err().unwrap().strip_backtrace(),
"Execution error: Format for `to_char` must be non-null Utf8,
received Timestamp(Nanosecond, None)"
diff --git a/datafusion/functions/src/datetime/to_date.rs
b/datafusion/functions/src/datetime/to_date.rs
index 8f72100416..ff322ce319 100644
--- a/datafusion/functions/src/datetime/to_date.rs
+++ b/datafusion/functions/src/datetime/to_date.rs
@@ -213,8 +213,8 @@ mod tests {
}
fn test_scalar(sv: ScalarValue, tc: &TestCase) {
- #[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
- let to_date_result =
ToDateFunc::new().invoke(&[ColumnarValue::Scalar(sv)]);
+ let to_date_result =
+ ToDateFunc::new().invoke_batch(&[ColumnarValue::Scalar(sv)],
1);
match to_date_result {
Ok(ColumnarValue::Scalar(ScalarValue::Date32(date_val))) => {
@@ -234,9 +234,9 @@ mod tests {
A: From<Vec<&'static str>> + Array + 'static,
{
let date_array = A::from(vec![tc.date_str]);
- #[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
- let to_date_result =
-
ToDateFunc::new().invoke(&[ColumnarValue::Array(Arc::new(date_array))]);
+ let batch_size = date_array.len();
+ let to_date_result = ToDateFunc::new()
+ .invoke_batch(&[ColumnarValue::Array(Arc::new(date_array))],
batch_size);
match to_date_result {
Ok(ColumnarValue::Array(a)) => {
@@ -325,11 +325,13 @@ mod tests {
fn test_scalar(sv: ScalarValue, tc: &TestCase) {
let format_scalar =
ScalarValue::Utf8(Some(tc.format_str.to_string()));
- #[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
- let to_date_result = ToDateFunc::new().invoke(&[
- ColumnarValue::Scalar(sv),
- ColumnarValue::Scalar(format_scalar),
- ]);
+ let to_date_result = ToDateFunc::new().invoke_batch(
+ &[
+ ColumnarValue::Scalar(sv),
+ ColumnarValue::Scalar(format_scalar),
+ ],
+ 1,
+ );
match to_date_result {
Ok(ColumnarValue::Scalar(ScalarValue::Date32(date_val))) => {
@@ -350,11 +352,14 @@ mod tests {
let date_array = A::from(vec![tc.formatted_date]);
let format_array = A::from(vec![tc.format_str]);
- #[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
- let to_date_result = ToDateFunc::new().invoke(&[
- ColumnarValue::Array(Arc::new(date_array)),
- ColumnarValue::Array(Arc::new(format_array)),
- ]);
+ let batch_size = date_array.len();
+ let to_date_result = ToDateFunc::new().invoke_batch(
+ &[
+ ColumnarValue::Array(Arc::new(date_array)),
+ ColumnarValue::Array(Arc::new(format_array)),
+ ],
+ batch_size,
+ );
match to_date_result {
Ok(ColumnarValue::Array(a)) => {
@@ -386,12 +391,14 @@ mod tests {
let format1_scalar = ScalarValue::Utf8(Some("%Y-%m-%d".into()));
let format2_scalar = ScalarValue::Utf8(Some("%Y/%m/%d".into()));
- #[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
- let to_date_result = ToDateFunc::new().invoke(&[
- ColumnarValue::Scalar(formatted_date_scalar),
- ColumnarValue::Scalar(format1_scalar),
- ColumnarValue::Scalar(format2_scalar),
- ]);
+ let to_date_result = ToDateFunc::new().invoke_batch(
+ &[
+ ColumnarValue::Scalar(formatted_date_scalar),
+ ColumnarValue::Scalar(format1_scalar),
+ ColumnarValue::Scalar(format2_scalar),
+ ],
+ 1,
+ );
match to_date_result {
Ok(ColumnarValue::Scalar(ScalarValue::Date32(date_val))) => {
@@ -415,9 +422,8 @@ mod tests {
for date_str in test_cases {
let formatted_date_scalar =
ScalarValue::Utf8(Some(date_str.into()));
- #[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
- let to_date_result =
-
ToDateFunc::new().invoke(&[ColumnarValue::Scalar(formatted_date_scalar)]);
+ let to_date_result = ToDateFunc::new()
+ .invoke_batch(&[ColumnarValue::Scalar(formatted_date_scalar)],
1);
match to_date_result {
Ok(ColumnarValue::Scalar(ScalarValue::Date32(date_val))) => {
@@ -434,9 +440,8 @@ mod tests {
let date_str = "20241231";
let date_scalar = ScalarValue::Utf8(Some(date_str.into()));
- #[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let to_date_result =
- ToDateFunc::new().invoke(&[ColumnarValue::Scalar(date_scalar)]);
+
ToDateFunc::new().invoke_batch(&[ColumnarValue::Scalar(date_scalar)], 1);
match to_date_result {
Ok(ColumnarValue::Scalar(ScalarValue::Date32(date_val))) => {
@@ -456,9 +461,8 @@ mod tests {
let date_str = "202412311";
let date_scalar = ScalarValue::Utf8(Some(date_str.into()));
- #[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let to_date_result =
- ToDateFunc::new().invoke(&[ColumnarValue::Scalar(date_scalar)]);
+
ToDateFunc::new().invoke_batch(&[ColumnarValue::Scalar(date_scalar)], 1);
if let Ok(ColumnarValue::Scalar(ScalarValue::Date32(_))) =
to_date_result {
panic!(
diff --git a/datafusion/functions/src/string/concat.rs
b/datafusion/functions/src/string/concat.rs
index e429a938b2..f1e60004dd 100644
--- a/datafusion/functions/src/string/concat.rs
+++ b/datafusion/functions/src/string/concat.rs
@@ -408,8 +408,7 @@ mod tests {
])));
let args = &[c0, c1, c2];
- #[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
- let result = ConcatFunc::new().invoke(args)?;
+ let result = ConcatFunc::new().invoke_batch(args, 3)?;
let expected =
Arc::new(StringArray::from(vec!["foo,x", "bar,", "baz,z"])) as
ArrayRef;
match &result {
diff --git a/datafusion/functions/src/string/concat_ws.rs
b/datafusion/functions/src/string/concat_ws.rs
index 611c48a963..98a75f121c 100644
--- a/datafusion/functions/src/string/concat_ws.rs
+++ b/datafusion/functions/src/string/concat_ws.rs
@@ -467,8 +467,7 @@ mod tests {
])));
let args = &[c0, c1, c2];
- #[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
- let result = ConcatWsFunc::new().invoke(args)?;
+ let result = ConcatWsFunc::new().invoke_batch(args, 3)?;
let expected =
Arc::new(StringArray::from(vec!["foo,x", "bar", "baz,z"])) as
ArrayRef;
match &result {
@@ -493,8 +492,7 @@ mod tests {
])));
let args = &[c0, c1, c2];
- #[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
- let result = ConcatWsFunc::new().invoke(args)?;
+ let result = ConcatWsFunc::new().invoke_batch(args, 3)?;
let expected =
Arc::new(StringArray::from(vec![Some("foo,x"), None,
Some("baz+z")]))
as ArrayRef;
diff --git a/datafusion/functions/src/string/contains.rs
b/datafusion/functions/src/string/contains.rs
index 0c665a1391..3acd246452 100644
--- a/datafusion/functions/src/string/contains.rs
+++ b/datafusion/functions/src/string/contains.rs
@@ -145,8 +145,7 @@ mod test {
Some("yyy?()"),
])));
let scalar =
ColumnarValue::Scalar(ScalarValue::Utf8(Some("x?(".to_string())));
- #[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
- let actual = udf.invoke(&[array, scalar]).unwrap();
+ let actual = udf.invoke_batch(&[array, scalar], 2).unwrap();
let expect = ColumnarValue::Array(Arc::new(BooleanArray::from(vec![
Some(true),
Some(false),
diff --git a/datafusion/functions/src/string/lower.rs
b/datafusion/functions/src/string/lower.rs
index 02770e5e22..78887fde0a 100644
--- a/datafusion/functions/src/string/lower.rs
+++ b/datafusion/functions/src/string/lower.rs
@@ -104,9 +104,9 @@ mod tests {
fn to_lower(input: ArrayRef, expected: ArrayRef) -> Result<()> {
let func = LowerFunc::new();
+ let batch_size = input.len();
let args = vec![ColumnarValue::Array(input)];
- #[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
- let result = match func.invoke(&args)? {
+ let result = match func.invoke_batch(&args, batch_size)? {
ColumnarValue::Array(result) => result,
_ => unreachable!("lower"),
};
diff --git a/datafusion/functions/src/string/upper.rs
b/datafusion/functions/src/string/upper.rs
index 1293e51fa9..5039d094f2 100644
--- a/datafusion/functions/src/string/upper.rs
+++ b/datafusion/functions/src/string/upper.rs
@@ -104,9 +104,9 @@ mod tests {
fn to_upper(input: ArrayRef, expected: ArrayRef) -> Result<()> {
let func = UpperFunc::new();
+ let batch_size = input.len();
let args = vec![ColumnarValue::Array(input)];
- #[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
- let result = match func.invoke(&args)? {
+ let result = match func.invoke_batch(&args, batch_size)? {
ColumnarValue::Array(result) => result,
_ => unreachable!("upper"),
};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]