alamb commented on code in PR #13001:
URL: https://github.com/apache/datafusion/pull/13001#discussion_r1806933063


##########
datafusion/core/tests/user_defined/user_defined_scalar_functions.rs:
##########
@@ -483,6 +485,188 @@ async fn test_user_defined_functions_with_alias() -> 
Result<()> {
     Ok(())
 }
 
+struct AddIndexToStringScalarUDF {
+    name: String,
+    signature: Signature,
+    return_type: DataType,
+}
+
+impl AddIndexToStringScalarUDF {
+    fn new() -> Self {
+        Self {
+            name: "add_index_to_string".to_string(),
+            signature: Signature::exact(vec![DataType::Utf8], 
Volatility::Volatile),
+            return_type: DataType::Utf8,
+        }
+    }
+}
+
+impl std::fmt::Debug for AddIndexToStringScalarUDF {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        f.debug_struct("ScalarUDF")
+            .field("name", &self.name)
+            .field("signature", &self.signature)
+            .field("fun", &"<FUNC>")
+            .finish()
+    }
+}
+
+impl ScalarUDFImpl for AddIndexToStringScalarUDF {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
+        Ok(self.return_type.clone())
+    }
+
+    fn invoke(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue> {
+        not_impl_err!("index_with_offset function does not accept arguments")
+    }
+
+    fn invoke_batch(
+        &self,
+        _args: &[ColumnarValue],
+        _number_rows: usize,
+    ) -> Result<ColumnarValue> {
+        let answer = match &_args[0] {
+            ColumnarValue::Scalar(ScalarValue::Utf8(Some(value))) => {
+                let mut answer = vec![];
+                for index in 1..=_number_rows {
+                    answer.push(index.to_string() + ") " + value);
+                }
+                answer
+            }
+            ColumnarValue::Array(array) => {
+                let string_array = as_string_array(array);
+                let mut counter = HashMap::<&str, u64>::new();
+                string_array
+                    .iter()
+                    .map(|value| {
+                        let value = value.expect("Unexpected null");
+                        let index = counter.get(value).unwrap_or(&0) + 1;
+                        counter.insert(value, index);
+                        index.to_string() + ". " + value
+                    })
+                    .collect()
+            }
+            _ => unimplemented!(),
+        };
+        Ok(ColumnarValue::Array(
+            Arc::new(StringArray::from(answer)) as ArrayRef
+        ))
+    }
+}
+
+#[tokio::test]
+async fn volatile_scalar_udf_with_params() -> Result<()> {
+    {
+        let schema = Schema::new(vec![Field::new("a", DataType::Utf8, false)]);
+
+        let batch = RecordBatch::try_new(
+            Arc::new(schema.clone()),
+            vec![Arc::new(StringArray::from(vec![
+                "test_1", "test_1", "test_1", "test_2", "test_2", "test_1", 
"test_2",
+            ]))],
+        )?;
+        let ctx = SessionContext::new();
+
+        ctx.register_batch("t", batch)?;
+
+        let get_new_str_udf = AddIndexToStringScalarUDF::new();
+
+        ctx.register_udf(ScalarUDF::from(get_new_str_udf));
+
+        let result =
+            plan_and_collect(&ctx, "select add_index_to_string(t.a) AS str 
from t") // with dynamic function parameters
+                .await?;
+        let expected = [
+            "+-----------+", //
+            "| str       |", //
+            "+-----------+", //
+            "| 1. test_1 |", //

Review Comment:
   what is the meaning of the trailing `//` ?
   
   Also, it seems like the indexes repeat (multiple with 1) imply invoke is run 
multiple times - perhaps we could set `target_partitions` to 1 on the 
SessionContext so the data wasn't repartitioned?



##########
datafusion/expr/src/udf.rs:
##########
@@ -469,6 +480,21 @@ pub trait ScalarUDFImpl: Debug + Send + Sync {
     /// [invoke_no_args]: ScalarUDFImpl::invoke_no_args
     fn invoke(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue>;
 
+    /// Invoke the function with `args` and the number of rows,
+    /// returning the appropriate result.
+    /// The function is called for signatures with 
[`datafusion_expr_common::signature::Volatility::Volatile`]
+    /// and with arguments
+    fn invoke_batch(
+        &self,
+        _args: &[ColumnarValue],
+        _number_rows: usize,
+    ) -> Result<ColumnarValue> {
+        not_impl_err!(
+            "Function {} does not implement invoke_batch but called",
+            self.name()
+        )

Review Comment:
   🤔  It seems like the ideal outcome would be for all ScalarUDFs to implement 
this method (as it covers `invoke`, `invoke_no_args` as well).
   
   Would you be open to changing this so it uses a default implementation like 
this?
   
   ```suggestion
     if _args.empty() { 
       self.invoke_no_args(number_rows) 
     } else {
       self.invoke(args)
     } 
   ```
   
   Then the function implementation could decide what to do with that 
information
   
   



##########
datafusion/core/tests/user_defined/user_defined_scalar_functions.rs:
##########
@@ -483,6 +485,188 @@ async fn test_user_defined_functions_with_alias() -> 
Result<()> {
     Ok(())
 }
 
+struct AddIndexToStringScalarUDF {

Review Comment:
   Perhaps we can add some comments here explaining what this is:
   
   ```suggestion
   /// Volatile UDF that should be append a different value to each row
   struct AddIndexToStringScalarUDF {
   ```



##########
datafusion/physical-expr/src/scalar_function.rs:
##########
@@ -143,7 +143,10 @@ impl PhysicalExpr for ScalarFunctionExpr {
         // evaluate the function
         let output = match self.args.is_empty() {
             true => self.fun.invoke_no_args(batch.num_rows()),
-            false => self.fun.invoke(&inputs),
+            false => match self.fun.signature().volatility {

Review Comment:
   If you modified invoke_batch as above, we could change this code to simply 
call `self.fun.invoke_batch()` always



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to