This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 98f02ffec5 Clean internal implementation of WindowUDF (#8746)
98f02ffec5 is described below

commit 98f02ffec5e51861185be298ff1981eef5f04958
Author: junxiangMu <[email protected]>
AuthorDate: Sat Jan 6 05:06:31 2024 +0800

    Clean internal implementation of WindowUDF (#8746)
    
    * Clean internal implementation of WindowUDF
    
    * fix doc
---
 datafusion-examples/examples/advanced_udwf.rs      |   1 +
 .../user_defined/user_defined_window_functions.rs  |   1 +
 datafusion/expr/src/expr_fn.rs                     |  12 ++
 datafusion/expr/src/udwf.rs                        | 138 +++++++++++++--------
 .../proto/tests/cases/roundtrip_logical_plan.rs    |   1 +
 5 files changed, 103 insertions(+), 50 deletions(-)

diff --git a/datafusion-examples/examples/advanced_udwf.rs 
b/datafusion-examples/examples/advanced_udwf.rs
index 91869d80a4..f46031434f 100644
--- a/datafusion-examples/examples/advanced_udwf.rs
+++ b/datafusion-examples/examples/advanced_udwf.rs
@@ -34,6 +34,7 @@ use datafusion_expr::{
 /// a function `partition_evaluator` that returns the `MyPartitionEvaluator` 
instance.
 ///
 /// To do so, we must implement the `WindowUDFImpl` trait.
+#[derive(Debug, Clone)]
 struct SmoothItUdf {
     signature: Signature,
 }
diff --git 
a/datafusion/core/tests/user_defined/user_defined_window_functions.rs 
b/datafusion/core/tests/user_defined/user_defined_window_functions.rs
index 3040fbafe8..54eab4315a 100644
--- a/datafusion/core/tests/user_defined/user_defined_window_functions.rs
+++ b/datafusion/core/tests/user_defined/user_defined_window_functions.rs
@@ -471,6 +471,7 @@ impl OddCounter {
     }
 
     fn register(ctx: &mut SessionContext, test_state: Arc<TestState>) {
+        #[derive(Debug, Clone)]
         struct SimpleWindowUDF {
             signature: Signature,
             return_type: DataType,
diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs
index f76fb17b38..7b3f652485 100644
--- a/datafusion/expr/src/expr_fn.rs
+++ b/datafusion/expr/src/expr_fn.rs
@@ -32,6 +32,7 @@ use crate::{ColumnarValue, ScalarUDFImpl, WindowUDF, 
WindowUDFImpl};
 use arrow::datatypes::DataType;
 use datafusion_common::{Column, Result};
 use std::any::Any;
+use std::fmt::Debug;
 use std::ops::Not;
 use std::sync::Arc;
 
@@ -1078,6 +1079,17 @@ pub struct SimpleWindowUDF {
     partition_evaluator_factory: PartitionEvaluatorFactory,
 }
 
+impl Debug for SimpleWindowUDF {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        f.debug_struct("WindowUDF")
+            .field("name", &self.name)
+            .field("signature", &self.signature)
+            .field("return_type", &"<func>")
+            .field("partition_evaluator_factory", &"<FUNC>")
+            .finish()
+    }
+}
+
 impl SimpleWindowUDF {
     /// Create a new `SimpleWindowUDF` from a name, input types, return type 
and
     /// implementation. Implementing [`WindowUDFImpl`] allows more flexibility
diff --git a/datafusion/expr/src/udwf.rs b/datafusion/expr/src/udwf.rs
index 800386bfc7..239a5e24cb 100644
--- a/datafusion/expr/src/udwf.rs
+++ b/datafusion/expr/src/udwf.rs
@@ -34,40 +34,33 @@ use std::{
 ///
 /// See the documetnation on [`PartitionEvaluator`] for more details
 ///
+/// 1. For simple (less performant) use cases, use [`create_udwf`] and 
[`simple_udwf.rs`].
+///
+/// 2. For advanced use cases, use [`WindowUDFImpl`] and [`advanced_udf.rs`].
+///
+/// # API Note
+/// This is a separate struct from `WindowUDFImpl` to maintain backwards
+/// compatibility with the older API.
+///
 /// [`PartitionEvaluator`]: crate::PartitionEvaluator
-#[derive(Clone)]
+/// [`create_udwf`]: crate::expr_fn::create_udwf
+/// [`simple_udwf.rs`]: 
https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/simple_udwf.rs
+/// [`advanced_udwf.rs`]: 
https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/advanced_udwf.rs
+#[derive(Debug, Clone)]
 pub struct WindowUDF {
-    /// name
-    name: String,
-    /// signature
-    signature: Signature,
-    /// Return type
-    return_type: ReturnTypeFunction,
-    /// Return the partition evaluator
-    partition_evaluator_factory: PartitionEvaluatorFactory,
-}
-
-impl Debug for WindowUDF {
-    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
-        f.debug_struct("WindowUDF")
-            .field("name", &self.name)
-            .field("signature", &self.signature)
-            .field("return_type", &"<func>")
-            .field("partition_evaluator_factory", &"<func>")
-            .finish_non_exhaustive()
-    }
+    inner: Arc<dyn WindowUDFImpl>,
 }
 
 /// Defines how the WindowUDF is shown to users
 impl Display for WindowUDF {
     fn fmt(&self, f: &mut Formatter) -> fmt::Result {
-        write!(f, "{}", self.name)
+        write!(f, "{}", self.name())
     }
 }
 
 impl PartialEq for WindowUDF {
     fn eq(&self, other: &Self) -> bool {
-        self.name == other.name && self.signature == other.signature
+        self.name() == other.name() && self.signature() == other.signature()
     }
 }
 
@@ -75,8 +68,8 @@ impl Eq for WindowUDF {}
 
 impl std::hash::Hash for WindowUDF {
     fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
-        self.name.hash(state);
-        self.signature.hash(state);
+        self.name().hash(state);
+        self.signature().hash(state);
     }
 }
 
@@ -92,12 +85,12 @@ impl WindowUDF {
         return_type: &ReturnTypeFunction,
         partition_evaluator_factory: &PartitionEvaluatorFactory,
     ) -> Self {
-        Self {
-            name: name.to_string(),
+        Self::new_from_impl(WindowUDFLegacyWrapper {
+            name: name.to_owned(),
             signature: signature.clone(),
             return_type: return_type.clone(),
             partition_evaluator_factory: partition_evaluator_factory.clone(),
-        }
+        })
     }
 
     /// Create a new `WindowUDF` from a `[WindowUDFImpl]` trait object
@@ -105,27 +98,18 @@ impl WindowUDF {
     /// Note this is the same as using the `From` impl (`WindowUDF::from`)
     pub fn new_from_impl<F>(fun: F) -> WindowUDF
     where
-        F: WindowUDFImpl + Send + Sync + 'static,
+        F: WindowUDFImpl + 'static,
     {
-        let arc_fun = Arc::new(fun);
-        let captured_self = arc_fun.clone();
-        let return_type: ReturnTypeFunction = Arc::new(move |arg_types| {
-            let return_type = captured_self.return_type(arg_types)?;
-            Ok(Arc::new(return_type))
-        });
-
-        let captured_self = arc_fun.clone();
-        let partition_evaluator_factory: PartitionEvaluatorFactory =
-            Arc::new(move || captured_self.partition_evaluator());
-
         Self {
-            name: arc_fun.name().to_string(),
-            signature: arc_fun.signature().clone(),
-            return_type: return_type.clone(),
-            partition_evaluator_factory,
+            inner: Arc::new(fun),
         }
     }
 
+    /// Return the underlying [`WindowUDFImpl`] trait object for this function
+    pub fn inner(&self) -> Arc<dyn WindowUDFImpl> {
+        self.inner.clone()
+    }
+
     /// creates a [`Expr`] that calls the window function given
     /// the `partition_by`, `order_by`, and `window_frame` definition
     ///
@@ -150,25 +134,29 @@ impl WindowUDF {
     }
 
     /// Returns this function's name
+    ///
+    /// See [`WindowUDFImpl::name`] for more details.
     pub fn name(&self) -> &str {
-        &self.name
+        self.inner.name()
     }
 
     /// Returns this function's signature (what input types are accepted)
+    ///
+    /// See [`WindowUDFImpl::signature`] for more details.
     pub fn signature(&self) -> &Signature {
-        &self.signature
+        self.inner.signature()
     }
 
     /// Return the type of the function given its input types
+    ///
+    /// See [`WindowUDFImpl::return_type`] for more details.
     pub fn return_type(&self, args: &[DataType]) -> Result<DataType> {
-        // Old API returns an Arc of the datatype for some reason
-        let res = (self.return_type)(args)?;
-        Ok(res.as_ref().clone())
+        self.inner.return_type(args)
     }
 
     /// Return a `PartitionEvaluator` for evaluating this window function
     pub fn partition_evaluator_factory(&self) -> Result<Box<dyn 
PartitionEvaluator>> {
-        (self.partition_evaluator_factory)()
+        self.inner.partition_evaluator()
     }
 }
 
@@ -198,6 +186,7 @@ where
 /// # use datafusion_common::{DataFusionError, plan_err, Result};
 /// # use datafusion_expr::{col, Signature, Volatility, PartitionEvaluator, 
WindowFrame};
 /// # use datafusion_expr::{WindowUDFImpl, WindowUDF};
+/// #[derive(Debug, Clone)]
 /// struct SmoothIt {
 ///   signature: Signature
 /// };
@@ -236,7 +225,7 @@ where
 ///     WindowFrame::new(false),
 /// );
 /// ```
-pub trait WindowUDFImpl {
+pub trait WindowUDFImpl: Debug + Send + Sync {
     /// Returns this object as an [`Any`] trait object
     fn as_any(&self) -> &dyn Any;
 
@@ -254,3 +243,52 @@ pub trait WindowUDFImpl {
     /// Invoke the function, returning the [`PartitionEvaluator`] instance
     fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>>;
 }
+
+/// Implementation of [`WindowUDFImpl`] that wraps the function style pointers
+/// of the older API (see 
<https://github.com/apache/arrow-datafusion/pull/8719>
+/// for more details)
+pub struct WindowUDFLegacyWrapper {
+    /// name
+    name: String,
+    /// signature
+    signature: Signature,
+    /// Return type
+    return_type: ReturnTypeFunction,
+    /// Return the partition evaluator
+    partition_evaluator_factory: PartitionEvaluatorFactory,
+}
+
+impl Debug for WindowUDFLegacyWrapper {
+    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+        f.debug_struct("WindowUDF")
+            .field("name", &self.name)
+            .field("signature", &self.signature)
+            .field("return_type", &"<func>")
+            .field("partition_evaluator_factory", &"<func>")
+            .finish_non_exhaustive()
+    }
+}
+
+impl WindowUDFImpl for WindowUDFLegacyWrapper {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+        // Old API returns an Arc of the datatype for some reason
+        let res = (self.return_type)(arg_types)?;
+        Ok(res.as_ref().clone())
+    }
+
+    fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
+        (self.partition_evaluator_factory)()
+    }
+}
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index 402781e17e..03daf535f2 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -1787,6 +1787,7 @@ fn roundtrip_window() {
         }
     }
 
+    #[derive(Debug, Clone)]
     struct SimpleWindowUDF {
         signature: Signature,
     }

Reply via email to