This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 169701e012 Optimize date_bin (2x faster) (#10215)
169701e012 is described below

commit 169701e0128911f16ed07e1ea714a4fdc1e90ee0
Author: Simon Vandel Sillesen <[email protected]>
AuthorDate: Thu Apr 25 11:57:06 2024 +0000

    Optimize date_bin (2x faster) (#10215)
    
    * add date_bin benchmark
    
    * optimize date_bin
    
    As mentioned in the docs for `PrimaryArray::unary` it is faster to apply an 
infallible operation across both valid and invalid values, rather than 
branching at every value.
    
    1) Make stride function infallible
    2) Use `unary` method
    
    This gives this speedup on my machine:
    Before: 22.345 µs
    After: 10.558 µs
    
    So around 2x faster
---
 datafusion/functions/Cargo.toml               |  5 +++
 datafusion/functions/benches/date_bin.rs      | 57 +++++++++++++++++++++++++++
 datafusion/functions/src/datetime/date_bin.rs | 19 +++++----
 3 files changed, 71 insertions(+), 10 deletions(-)

diff --git a/datafusion/functions/Cargo.toml b/datafusion/functions/Cargo.toml
index 577ecdb746..0886dee034 100644
--- a/datafusion/functions/Cargo.toml
+++ b/datafusion/functions/Cargo.toml
@@ -112,6 +112,11 @@ harness = false
 name = "make_date"
 required-features = ["datetime_expressions"]
 
+[[bench]]
+harness = false
+name = "date_bin"
+required-features = ["datetime_expressions"]
+
 [[bench]]
 harness = false
 name = "to_char"
diff --git a/datafusion/functions/benches/date_bin.rs 
b/datafusion/functions/benches/date_bin.rs
new file mode 100644
index 0000000000..c881947354
--- /dev/null
+++ b/datafusion/functions/benches/date_bin.rs
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+extern crate criterion;
+
+use std::sync::Arc;
+
+use arrow::array::{ArrayRef, TimestampSecondArray};
+use criterion::{black_box, criterion_group, criterion_main, Criterion};
+use datafusion_common::ScalarValue;
+use rand::rngs::ThreadRng;
+use rand::Rng;
+
+use datafusion_expr::ColumnarValue;
+use datafusion_functions::datetime::date_bin;
+
+fn timestamps(rng: &mut ThreadRng) -> TimestampSecondArray {
+    let mut seconds = vec![];
+    for _ in 0..1000 {
+        seconds.push(rng.gen_range(0..1_000_000));
+    }
+
+    TimestampSecondArray::from(seconds)
+}
+
+fn criterion_benchmark(c: &mut Criterion) {
+    c.bench_function("date_bin_1000", |b| {
+        let mut rng = rand::thread_rng();
+        let interval = ColumnarValue::Scalar(ScalarValue::new_interval_dt(0, 
1_000_000));
+        let timestamps = ColumnarValue::Array(Arc::new(timestamps(&mut rng)) 
as ArrayRef);
+        let udf = date_bin();
+
+        b.iter(|| {
+            black_box(
+                udf.invoke(&[interval.clone(), timestamps.clone()])
+                    .expect("date_bin should work on valid values"),
+            )
+        })
+    });
+}
+
+criterion_group!(benches, criterion_benchmark);
+criterion_main!(benches);
diff --git a/datafusion/functions/src/datetime/date_bin.rs 
b/datafusion/functions/src/datetime/date_bin.rs
index 7f5d9bb5d9..da1797cdae 100644
--- a/datafusion/functions/src/datetime/date_bin.rs
+++ b/datafusion/functions/src/datetime/date_bin.rs
@@ -320,14 +320,14 @@ fn date_bin_impl(
         origin: i64,
         stride: i64,
         stride_fn: fn(i64, i64, i64) -> i64,
-    ) -> impl Fn(Option<i64>) -> Option<i64> {
+    ) -> impl Fn(i64) -> i64 {
         let scale = match T::UNIT {
             Nanosecond => 1,
             Microsecond => NANOSECONDS / 1_000_000,
             Millisecond => NANOSECONDS / 1_000,
             Second => NANOSECONDS,
         };
-        move |x: Option<i64>| x.map(|x| stride_fn(stride, x * scale, origin) / 
scale)
+        move |x: i64| stride_fn(stride, x * scale, origin) / scale
     }
 
     Ok(match array {
@@ -335,7 +335,7 @@ fn date_bin_impl(
             let apply_stride_fn =
                 stride_map_fn::<TimestampNanosecondType>(origin, stride, 
stride_fn);
             ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
-                apply_stride_fn(*v),
+                v.map(apply_stride_fn),
                 tz_opt.clone(),
             ))
         }
@@ -343,7 +343,7 @@ fn date_bin_impl(
             let apply_stride_fn =
                 stride_map_fn::<TimestampMicrosecondType>(origin, stride, 
stride_fn);
             ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
-                apply_stride_fn(*v),
+                v.map(apply_stride_fn),
                 tz_opt.clone(),
             ))
         }
@@ -351,7 +351,7 @@ fn date_bin_impl(
             let apply_stride_fn =
                 stride_map_fn::<TimestampMillisecondType>(origin, stride, 
stride_fn);
             ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
-                apply_stride_fn(*v),
+                v.map(apply_stride_fn),
                 tz_opt.clone(),
             ))
         }
@@ -359,7 +359,7 @@ fn date_bin_impl(
             let apply_stride_fn =
                 stride_map_fn::<TimestampSecondType>(origin, stride, 
stride_fn);
             ColumnarValue::Scalar(ScalarValue::TimestampSecond(
-                apply_stride_fn(*v),
+                v.map(apply_stride_fn),
                 tz_opt.clone(),
             ))
         }
@@ -377,14 +377,13 @@ fn date_bin_impl(
             {
                 let array = as_primitive_array::<T>(array)?;
                 let apply_stride_fn = stride_map_fn::<T>(origin, stride, 
stride_fn);
-                let array = array
-                    .iter()
-                    .map(apply_stride_fn)
-                    .collect::<PrimitiveArray<T>>()
+                let array: PrimitiveArray<T> = array
+                    .unary(apply_stride_fn)
                     .with_timezone_opt(tz_opt.clone());
 
                 Ok(ColumnarValue::Array(Arc::new(array)))
             }
+
             match array.data_type() {
                 Timestamp(Nanosecond, tz_opt) => {
                     transform_array_with_stride::<TimestampNanosecondType>(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to