alamb commented on code in PR #13001:
URL: https://github.com/apache/datafusion/pull/13001#discussion_r1806933063
##########
datafusion/core/tests/user_defined/user_defined_scalar_functions.rs:
##########
@@ -483,6 +485,188 @@ async fn test_user_defined_functions_with_alias() ->
Result<()> {
Ok(())
}
+struct AddIndexToStringScalarUDF {
+ name: String,
+ signature: Signature,
+ return_type: DataType,
+}
+
+impl AddIndexToStringScalarUDF {
+ fn new() -> Self {
+ Self {
+ name: "add_index_to_string".to_string(),
+ signature: Signature::exact(vec![DataType::Utf8],
Volatility::Volatile),
+ return_type: DataType::Utf8,
+ }
+ }
+}
+
+impl std::fmt::Debug for AddIndexToStringScalarUDF {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ f.debug_struct("ScalarUDF")
+ .field("name", &self.name)
+ .field("signature", &self.signature)
+ .field("fun", &"<FUNC>")
+ .finish()
+ }
+}
+
+impl ScalarUDFImpl for AddIndexToStringScalarUDF {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn name(&self) -> &str {
+ &self.name
+ }
+
+ fn signature(&self) -> &Signature {
+ &self.signature
+ }
+
+ fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
+ Ok(self.return_type.clone())
+ }
+
+ fn invoke(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue> {
+ not_impl_err!("index_with_offset function does not accept arguments")
+ }
+
+ fn invoke_batch(
+ &self,
+ _args: &[ColumnarValue],
+ _number_rows: usize,
+ ) -> Result<ColumnarValue> {
+ let answer = match &_args[0] {
+ ColumnarValue::Scalar(ScalarValue::Utf8(Some(value))) => {
+ let mut answer = vec![];
+ for index in 1..=_number_rows {
+ answer.push(index.to_string() + ") " + value);
+ }
+ answer
+ }
+ ColumnarValue::Array(array) => {
+ let string_array = as_string_array(array);
+ let mut counter = HashMap::<&str, u64>::new();
+ string_array
+ .iter()
+ .map(|value| {
+ let value = value.expect("Unexpected null");
+ let index = counter.get(value).unwrap_or(&0) + 1;
+ counter.insert(value, index);
+ index.to_string() + ". " + value
+ })
+ .collect()
+ }
+ _ => unimplemented!(),
+ };
+ Ok(ColumnarValue::Array(
+ Arc::new(StringArray::from(answer)) as ArrayRef
+ ))
+ }
+}
+
+#[tokio::test]
+async fn volatile_scalar_udf_with_params() -> Result<()> {
+ {
+ let schema = Schema::new(vec![Field::new("a", DataType::Utf8, false)]);
+
+ let batch = RecordBatch::try_new(
+ Arc::new(schema.clone()),
+ vec![Arc::new(StringArray::from(vec![
+ "test_1", "test_1", "test_1", "test_2", "test_2", "test_1",
"test_2",
+ ]))],
+ )?;
+ let ctx = SessionContext::new();
+
+ ctx.register_batch("t", batch)?;
+
+ let get_new_str_udf = AddIndexToStringScalarUDF::new();
+
+ ctx.register_udf(ScalarUDF::from(get_new_str_udf));
+
+ let result =
+ plan_and_collect(&ctx, "select add_index_to_string(t.a) AS str
from t") // with dynamic function parameters
+ .await?;
+ let expected = [
+ "+-----------+", //
+ "| str |", //
+ "+-----------+", //
+ "| 1. test_1 |", //
Review Comment:
what is the meaning of the trailing `//` ?
Also, it seems like the indexes repeat (multiple with 1) imply invoke is run
multiple times - perhaps we could set `target_partitions` to 1 on the
SessionContext so the data wasn't repartitioned?
##########
datafusion/expr/src/udf.rs:
##########
@@ -469,6 +480,21 @@ pub trait ScalarUDFImpl: Debug + Send + Sync {
/// [invoke_no_args]: ScalarUDFImpl::invoke_no_args
fn invoke(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue>;
+ /// Invoke the function with `args` and the number of rows,
+ /// returning the appropriate result.
+ /// The function is called for signatures with
[`datafusion_expr_common::signature::Volatility::Volatile`]
+ /// and with arguments
+ fn invoke_batch(
+ &self,
+ _args: &[ColumnarValue],
+ _number_rows: usize,
+ ) -> Result<ColumnarValue> {
+ not_impl_err!(
+ "Function {} does not implement invoke_batch but called",
+ self.name()
+ )
Review Comment:
🤔 It seems like the ideal outcome would be for all ScalarUDFs to implement
this method (as it covers `invoke`, `invoke_no_args` as well).
Would you be open to changing this so it uses a default implementation like
this?
```suggestion
if _args.empty() {
self.invoke_no_args(number_rows)
} else {
self.invoke(args)
}
```
Then the function implementation could decide what to do with that
information
##########
datafusion/core/tests/user_defined/user_defined_scalar_functions.rs:
##########
@@ -483,6 +485,188 @@ async fn test_user_defined_functions_with_alias() ->
Result<()> {
Ok(())
}
+struct AddIndexToStringScalarUDF {
Review Comment:
Perhaps we can add some comments here explaining what this is:
```suggestion
/// Volatile UDF that should be append a different value to each row
struct AddIndexToStringScalarUDF {
```
##########
datafusion/physical-expr/src/scalar_function.rs:
##########
@@ -143,7 +143,10 @@ impl PhysicalExpr for ScalarFunctionExpr {
// evaluate the function
let output = match self.args.is_empty() {
true => self.fun.invoke_no_args(batch.num_rows()),
- false => self.fun.invoke(&inputs),
+ false => match self.fun.signature().volatility {
Review Comment:
If you modified invoke_batch as above, we could change this code to simply
call `self.fun.invoke_batch()` always
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]