This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 169701e012 Optimize date_bin (2x faster) (#10215)
169701e012 is described below
commit 169701e0128911f16ed07e1ea714a4fdc1e90ee0
Author: Simon Vandel Sillesen <[email protected]>
AuthorDate: Thu Apr 25 11:57:06 2024 +0000
Optimize date_bin (2x faster) (#10215)
* add date_bin benchmark
* optimize date_bin
As mentioned in the docs for `PrimaryArray::unary` it is faster to apply an
infallible operation across both valid and invalid values, rather than
branching at every value.
1) Make stride function infallible
2) Use `unary` method
This gives this speedup on my machine:
Before: 22.345 µs
After: 10.558 µs
So around 2x faster
---
datafusion/functions/Cargo.toml | 5 +++
datafusion/functions/benches/date_bin.rs | 57 +++++++++++++++++++++++++++
datafusion/functions/src/datetime/date_bin.rs | 19 +++++----
3 files changed, 71 insertions(+), 10 deletions(-)
diff --git a/datafusion/functions/Cargo.toml b/datafusion/functions/Cargo.toml
index 577ecdb746..0886dee034 100644
--- a/datafusion/functions/Cargo.toml
+++ b/datafusion/functions/Cargo.toml
@@ -112,6 +112,11 @@ harness = false
name = "make_date"
required-features = ["datetime_expressions"]
+[[bench]]
+harness = false
+name = "date_bin"
+required-features = ["datetime_expressions"]
+
[[bench]]
harness = false
name = "to_char"
diff --git a/datafusion/functions/benches/date_bin.rs
b/datafusion/functions/benches/date_bin.rs
new file mode 100644
index 0000000000..c881947354
--- /dev/null
+++ b/datafusion/functions/benches/date_bin.rs
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+extern crate criterion;
+
+use std::sync::Arc;
+
+use arrow::array::{ArrayRef, TimestampSecondArray};
+use criterion::{black_box, criterion_group, criterion_main, Criterion};
+use datafusion_common::ScalarValue;
+use rand::rngs::ThreadRng;
+use rand::Rng;
+
+use datafusion_expr::ColumnarValue;
+use datafusion_functions::datetime::date_bin;
+
+fn timestamps(rng: &mut ThreadRng) -> TimestampSecondArray {
+ let mut seconds = vec![];
+ for _ in 0..1000 {
+ seconds.push(rng.gen_range(0..1_000_000));
+ }
+
+ TimestampSecondArray::from(seconds)
+}
+
+fn criterion_benchmark(c: &mut Criterion) {
+ c.bench_function("date_bin_1000", |b| {
+ let mut rng = rand::thread_rng();
+ let interval = ColumnarValue::Scalar(ScalarValue::new_interval_dt(0,
1_000_000));
+ let timestamps = ColumnarValue::Array(Arc::new(timestamps(&mut rng))
as ArrayRef);
+ let udf = date_bin();
+
+ b.iter(|| {
+ black_box(
+ udf.invoke(&[interval.clone(), timestamps.clone()])
+ .expect("date_bin should work on valid values"),
+ )
+ })
+ });
+}
+
+criterion_group!(benches, criterion_benchmark);
+criterion_main!(benches);
diff --git a/datafusion/functions/src/datetime/date_bin.rs
b/datafusion/functions/src/datetime/date_bin.rs
index 7f5d9bb5d9..da1797cdae 100644
--- a/datafusion/functions/src/datetime/date_bin.rs
+++ b/datafusion/functions/src/datetime/date_bin.rs
@@ -320,14 +320,14 @@ fn date_bin_impl(
origin: i64,
stride: i64,
stride_fn: fn(i64, i64, i64) -> i64,
- ) -> impl Fn(Option<i64>) -> Option<i64> {
+ ) -> impl Fn(i64) -> i64 {
let scale = match T::UNIT {
Nanosecond => 1,
Microsecond => NANOSECONDS / 1_000_000,
Millisecond => NANOSECONDS / 1_000,
Second => NANOSECONDS,
};
- move |x: Option<i64>| x.map(|x| stride_fn(stride, x * scale, origin) /
scale)
+ move |x: i64| stride_fn(stride, x * scale, origin) / scale
}
Ok(match array {
@@ -335,7 +335,7 @@ fn date_bin_impl(
let apply_stride_fn =
stride_map_fn::<TimestampNanosecondType>(origin, stride,
stride_fn);
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
- apply_stride_fn(*v),
+ v.map(apply_stride_fn),
tz_opt.clone(),
))
}
@@ -343,7 +343,7 @@ fn date_bin_impl(
let apply_stride_fn =
stride_map_fn::<TimestampMicrosecondType>(origin, stride,
stride_fn);
ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
- apply_stride_fn(*v),
+ v.map(apply_stride_fn),
tz_opt.clone(),
))
}
@@ -351,7 +351,7 @@ fn date_bin_impl(
let apply_stride_fn =
stride_map_fn::<TimestampMillisecondType>(origin, stride,
stride_fn);
ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
- apply_stride_fn(*v),
+ v.map(apply_stride_fn),
tz_opt.clone(),
))
}
@@ -359,7 +359,7 @@ fn date_bin_impl(
let apply_stride_fn =
stride_map_fn::<TimestampSecondType>(origin, stride,
stride_fn);
ColumnarValue::Scalar(ScalarValue::TimestampSecond(
- apply_stride_fn(*v),
+ v.map(apply_stride_fn),
tz_opt.clone(),
))
}
@@ -377,14 +377,13 @@ fn date_bin_impl(
{
let array = as_primitive_array::<T>(array)?;
let apply_stride_fn = stride_map_fn::<T>(origin, stride,
stride_fn);
- let array = array
- .iter()
- .map(apply_stride_fn)
- .collect::<PrimitiveArray<T>>()
+ let array: PrimitiveArray<T> = array
+ .unary(apply_stride_fn)
.with_timezone_opt(tz_opt.clone());
Ok(ColumnarValue::Array(Arc::new(array)))
}
+
match array.data_type() {
Timestamp(Nanosecond, tz_opt) => {
transform_array_with_stride::<TimestampNanosecondType>(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]