This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 98f02ffec5 Clean internal implementation of WindowUDF (#8746)
98f02ffec5 is described below
commit 98f02ffec5e51861185be298ff1981eef5f04958
Author: junxiangMu <[email protected]>
AuthorDate: Sat Jan 6 05:06:31 2024 +0800
Clean internal implementation of WindowUDF (#8746)
* Clean internal implementation of WindowUDF
* fix doc
---
datafusion-examples/examples/advanced_udwf.rs | 1 +
.../user_defined/user_defined_window_functions.rs | 1 +
datafusion/expr/src/expr_fn.rs | 12 ++
datafusion/expr/src/udwf.rs | 138 +++++++++++++--------
.../proto/tests/cases/roundtrip_logical_plan.rs | 1 +
5 files changed, 103 insertions(+), 50 deletions(-)
diff --git a/datafusion-examples/examples/advanced_udwf.rs
b/datafusion-examples/examples/advanced_udwf.rs
index 91869d80a4..f46031434f 100644
--- a/datafusion-examples/examples/advanced_udwf.rs
+++ b/datafusion-examples/examples/advanced_udwf.rs
@@ -34,6 +34,7 @@ use datafusion_expr::{
/// a function `partition_evaluator` that returns the `MyPartitionEvaluator`
instance.
///
/// To do so, we must implement the `WindowUDFImpl` trait.
+#[derive(Debug, Clone)]
struct SmoothItUdf {
signature: Signature,
}
diff --git
a/datafusion/core/tests/user_defined/user_defined_window_functions.rs
b/datafusion/core/tests/user_defined/user_defined_window_functions.rs
index 3040fbafe8..54eab4315a 100644
--- a/datafusion/core/tests/user_defined/user_defined_window_functions.rs
+++ b/datafusion/core/tests/user_defined/user_defined_window_functions.rs
@@ -471,6 +471,7 @@ impl OddCounter {
}
fn register(ctx: &mut SessionContext, test_state: Arc<TestState>) {
+ #[derive(Debug, Clone)]
struct SimpleWindowUDF {
signature: Signature,
return_type: DataType,
diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs
index f76fb17b38..7b3f652485 100644
--- a/datafusion/expr/src/expr_fn.rs
+++ b/datafusion/expr/src/expr_fn.rs
@@ -32,6 +32,7 @@ use crate::{ColumnarValue, ScalarUDFImpl, WindowUDF,
WindowUDFImpl};
use arrow::datatypes::DataType;
use datafusion_common::{Column, Result};
use std::any::Any;
+use std::fmt::Debug;
use std::ops::Not;
use std::sync::Arc;
@@ -1078,6 +1079,17 @@ pub struct SimpleWindowUDF {
partition_evaluator_factory: PartitionEvaluatorFactory,
}
+impl Debug for SimpleWindowUDF {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ f.debug_struct("WindowUDF")
+ .field("name", &self.name)
+ .field("signature", &self.signature)
+ .field("return_type", &"<func>")
+ .field("partition_evaluator_factory", &"<FUNC>")
+ .finish()
+ }
+}
+
impl SimpleWindowUDF {
/// Create a new `SimpleWindowUDF` from a name, input types, return type
and
/// implementation. Implementing [`WindowUDFImpl`] allows more flexibility
diff --git a/datafusion/expr/src/udwf.rs b/datafusion/expr/src/udwf.rs
index 800386bfc7..239a5e24cb 100644
--- a/datafusion/expr/src/udwf.rs
+++ b/datafusion/expr/src/udwf.rs
@@ -34,40 +34,33 @@ use std::{
///
/// See the documetnation on [`PartitionEvaluator`] for more details
///
+/// 1. For simple (less performant) use cases, use [`create_udwf`] and
[`simple_udwf.rs`].
+///
+/// 2. For advanced use cases, use [`WindowUDFImpl`] and [`advanced_udf.rs`].
+///
+/// # API Note
+/// This is a separate struct from `WindowUDFImpl` to maintain backwards
+/// compatibility with the older API.
+///
/// [`PartitionEvaluator`]: crate::PartitionEvaluator
-#[derive(Clone)]
+/// [`create_udwf`]: crate::expr_fn::create_udwf
+/// [`simple_udwf.rs`]:
https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/simple_udwf.rs
+/// [`advanced_udwf.rs`]:
https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/advanced_udwf.rs
+#[derive(Debug, Clone)]
pub struct WindowUDF {
- /// name
- name: String,
- /// signature
- signature: Signature,
- /// Return type
- return_type: ReturnTypeFunction,
- /// Return the partition evaluator
- partition_evaluator_factory: PartitionEvaluatorFactory,
-}
-
-impl Debug for WindowUDF {
- fn fmt(&self, f: &mut Formatter) -> fmt::Result {
- f.debug_struct("WindowUDF")
- .field("name", &self.name)
- .field("signature", &self.signature)
- .field("return_type", &"<func>")
- .field("partition_evaluator_factory", &"<func>")
- .finish_non_exhaustive()
- }
+ inner: Arc<dyn WindowUDFImpl>,
}
/// Defines how the WindowUDF is shown to users
impl Display for WindowUDF {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
- write!(f, "{}", self.name)
+ write!(f, "{}", self.name())
}
}
impl PartialEq for WindowUDF {
fn eq(&self, other: &Self) -> bool {
- self.name == other.name && self.signature == other.signature
+ self.name() == other.name() && self.signature() == other.signature()
}
}
@@ -75,8 +68,8 @@ impl Eq for WindowUDF {}
impl std::hash::Hash for WindowUDF {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
- self.name.hash(state);
- self.signature.hash(state);
+ self.name().hash(state);
+ self.signature().hash(state);
}
}
@@ -92,12 +85,12 @@ impl WindowUDF {
return_type: &ReturnTypeFunction,
partition_evaluator_factory: &PartitionEvaluatorFactory,
) -> Self {
- Self {
- name: name.to_string(),
+ Self::new_from_impl(WindowUDFLegacyWrapper {
+ name: name.to_owned(),
signature: signature.clone(),
return_type: return_type.clone(),
partition_evaluator_factory: partition_evaluator_factory.clone(),
- }
+ })
}
/// Create a new `WindowUDF` from a `[WindowUDFImpl]` trait object
@@ -105,27 +98,18 @@ impl WindowUDF {
/// Note this is the same as using the `From` impl (`WindowUDF::from`)
pub fn new_from_impl<F>(fun: F) -> WindowUDF
where
- F: WindowUDFImpl + Send + Sync + 'static,
+ F: WindowUDFImpl + 'static,
{
- let arc_fun = Arc::new(fun);
- let captured_self = arc_fun.clone();
- let return_type: ReturnTypeFunction = Arc::new(move |arg_types| {
- let return_type = captured_self.return_type(arg_types)?;
- Ok(Arc::new(return_type))
- });
-
- let captured_self = arc_fun.clone();
- let partition_evaluator_factory: PartitionEvaluatorFactory =
- Arc::new(move || captured_self.partition_evaluator());
-
Self {
- name: arc_fun.name().to_string(),
- signature: arc_fun.signature().clone(),
- return_type: return_type.clone(),
- partition_evaluator_factory,
+ inner: Arc::new(fun),
}
}
+ /// Return the underlying [`WindowUDFImpl`] trait object for this function
+ pub fn inner(&self) -> Arc<dyn WindowUDFImpl> {
+ self.inner.clone()
+ }
+
/// creates a [`Expr`] that calls the window function given
/// the `partition_by`, `order_by`, and `window_frame` definition
///
@@ -150,25 +134,29 @@ impl WindowUDF {
}
/// Returns this function's name
+ ///
+ /// See [`WindowUDFImpl::name`] for more details.
pub fn name(&self) -> &str {
- &self.name
+ self.inner.name()
}
/// Returns this function's signature (what input types are accepted)
+ ///
+ /// See [`WindowUDFImpl::signature`] for more details.
pub fn signature(&self) -> &Signature {
- &self.signature
+ self.inner.signature()
}
/// Return the type of the function given its input types
+ ///
+ /// See [`WindowUDFImpl::return_type`] for more details.
pub fn return_type(&self, args: &[DataType]) -> Result<DataType> {
- // Old API returns an Arc of the datatype for some reason
- let res = (self.return_type)(args)?;
- Ok(res.as_ref().clone())
+ self.inner.return_type(args)
}
/// Return a `PartitionEvaluator` for evaluating this window function
pub fn partition_evaluator_factory(&self) -> Result<Box<dyn
PartitionEvaluator>> {
- (self.partition_evaluator_factory)()
+ self.inner.partition_evaluator()
}
}
@@ -198,6 +186,7 @@ where
/// # use datafusion_common::{DataFusionError, plan_err, Result};
/// # use datafusion_expr::{col, Signature, Volatility, PartitionEvaluator,
WindowFrame};
/// # use datafusion_expr::{WindowUDFImpl, WindowUDF};
+/// #[derive(Debug, Clone)]
/// struct SmoothIt {
/// signature: Signature
/// };
@@ -236,7 +225,7 @@ where
/// WindowFrame::new(false),
/// );
/// ```
-pub trait WindowUDFImpl {
+pub trait WindowUDFImpl: Debug + Send + Sync {
/// Returns this object as an [`Any`] trait object
fn as_any(&self) -> &dyn Any;
@@ -254,3 +243,52 @@ pub trait WindowUDFImpl {
/// Invoke the function, returning the [`PartitionEvaluator`] instance
fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>>;
}
+
+/// Implementation of [`WindowUDFImpl`] that wraps the function style pointers
+/// of the older API (see
<https://github.com/apache/arrow-datafusion/pull/8719>
+/// for more details)
+pub struct WindowUDFLegacyWrapper {
+ /// name
+ name: String,
+ /// signature
+ signature: Signature,
+ /// Return type
+ return_type: ReturnTypeFunction,
+ /// Return the partition evaluator
+ partition_evaluator_factory: PartitionEvaluatorFactory,
+}
+
+impl Debug for WindowUDFLegacyWrapper {
+ fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+ f.debug_struct("WindowUDF")
+ .field("name", &self.name)
+ .field("signature", &self.signature)
+ .field("return_type", &"<func>")
+ .field("partition_evaluator_factory", &"<func>")
+ .finish_non_exhaustive()
+ }
+}
+
+impl WindowUDFImpl for WindowUDFLegacyWrapper {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn name(&self) -> &str {
+ &self.name
+ }
+
+ fn signature(&self) -> &Signature {
+ &self.signature
+ }
+
+ fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+ // Old API returns an Arc of the datatype for some reason
+ let res = (self.return_type)(arg_types)?;
+ Ok(res.as_ref().clone())
+ }
+
+ fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
+ (self.partition_evaluator_factory)()
+ }
+}
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index 402781e17e..03daf535f2 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -1787,6 +1787,7 @@ fn roundtrip_window() {
}
}
+ #[derive(Debug, Clone)]
struct SimpleWindowUDF {
signature: Signature,
}