alamb commented on code in PR #9031:
URL: https://github.com/apache/arrow-datafusion/pull/9031#discussion_r1470226144
##########
datafusion/core/tests/user_defined/user_defined_scalar_functions.rs:
##########
@@ -392,6 +396,112 @@ async fn test_user_defined_functions_with_alias() ->
Result<()> {
Ok(())
}
+#[derive(Debug)]
+pub struct RandomUDF {
+ signature: Signature,
+}
+
+impl RandomUDF {
+ pub fn new() -> Self {
+ Self {
+ signature: Signature::one_of(
+ vec![Any(0), Variadic(vec![Float64])],
+ Volatility::Volatile,
+ ),
+ }
+ }
+}
+
+impl ScalarUDFImpl for RandomUDF {
+ fn as_any(&self) -> &dyn std::any::Any {
+ self
+ }
+
+ fn name(&self) -> &str {
+ "random_udf"
+ }
+
+ fn signature(&self) -> &Signature {
+ &self.signature
+ }
+
+ fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
+ Ok(Float64)
+ }
+
+ fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
+ let len: usize = match &args[0] {
+ ColumnarValue::Array(array) => array.len(),
+ _ => {
+ return Err(datafusion::error::DataFusionError::Internal(
+ "Invalid argument type".to_string(),
+ ))
+ }
+ };
+ let mut rng = thread_rng();
+ let values = iter::repeat_with(|| rng.gen_range(0.1..1.0)).take(len);
+ let array = Float64Array::from_iter_values(values);
+ Ok(ColumnarValue::Array(Arc::new(array)))
+ }
+}
+
+#[tokio::test]
+async fn test_user_defined_functions_zero_argument() -> Result<()> {
+ let ctx = SessionContext::new();
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("index", DataType::UInt8, false),
+ Field::new("uint", DataType::UInt64, true),
+ Field::new("int", DataType::Int64, true),
+ Field::new("float", DataType::Float64, true),
+ ]));
+
+ let batch = RecordBatch::try_new(
+ schema,
+ vec![
+ Arc::new(UInt8Array::from_iter_values([1, 2, 3])),
+ Arc::new(UInt64Array::from(vec![Some(2), Some(3), None])),
+ Arc::new(Int64Array::from(vec![Some(-2), Some(3), None])),
+ Arc::new(Float64Array::from(vec![Some(1.0), Some(3.3), None])),
+ ],
+ )?;
+
+ ctx.register_batch("data_table", batch)?;
+
+ let random_normal_udf = ScalarUDF::from(RandomUDF::new());
+ ctx.register_udf(random_normal_udf);
+
+ let result = plan_and_collect(
+ &ctx,
+ "SELECT random_udf() AS random_udf, random() AS native_random FROM
data_table",
+ )
+ .await?;
+
+ assert_eq!(result.len(), 1);
+ let batch = &result[0];
+ let random_udf = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<Float64Array>()
+ .unwrap();
+ let native_random = batch
+ .column(1)
+ .as_any()
+ .downcast_ref::<Float64Array>()
+ .unwrap();
+
+ assert_eq!(random_udf.len(), native_random.len());
+
+ let mut previous = 1.0;
Review Comment:
Can could the random implementation ever actually make 1.0 (the range is
`0..1.0`). Maybe we could start at -1.0 or something just to be sure this won't
ever flake
##########
datafusion/core/tests/user_defined/user_defined_scalar_functions.rs:
##########
@@ -392,6 +396,112 @@ async fn test_user_defined_functions_with_alias() ->
Result<()> {
Ok(())
}
+#[derive(Debug)]
+pub struct RandomUDF {
+ signature: Signature,
+}
+
+impl RandomUDF {
+ pub fn new() -> Self {
+ Self {
+ signature: Signature::one_of(
+ vec![Any(0), Variadic(vec![Float64])],
+ Volatility::Volatile,
+ ),
+ }
+ }
+}
+
+impl ScalarUDFImpl for RandomUDF {
+ fn as_any(&self) -> &dyn std::any::Any {
+ self
+ }
+
+ fn name(&self) -> &str {
+ "random_udf"
+ }
+
+ fn signature(&self) -> &Signature {
+ &self.signature
+ }
+
+ fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
+ Ok(Float64)
+ }
+
+ fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
+ let len: usize = match &args[0] {
+ ColumnarValue::Array(array) => array.len(),
+ _ => {
+ return Err(datafusion::error::DataFusionError::Internal(
+ "Invalid argument type".to_string(),
+ ))
+ }
+ };
+ let mut rng = thread_rng();
+ let values = iter::repeat_with(|| rng.gen_range(0.1..1.0)).take(len);
+ let array = Float64Array::from_iter_values(values);
+ Ok(ColumnarValue::Array(Arc::new(array)))
+ }
+}
+
+#[tokio::test]
+async fn test_user_defined_functions_zero_argument() -> Result<()> {
+ let ctx = SessionContext::new();
+
+ let schema = Arc::new(Schema::new(vec![
Review Comment:
it doesn't hurt but I wonder if the example table needs 4 columns 🤔
##########
datafusion/core/tests/user_defined/user_defined_scalar_functions.rs:
##########
@@ -392,6 +396,112 @@ async fn test_user_defined_functions_with_alias() ->
Result<()> {
Ok(())
}
+#[derive(Debug)]
+pub struct RandomUDF {
+ signature: Signature,
+}
+
+impl RandomUDF {
+ pub fn new() -> Self {
+ Self {
+ signature: Signature::one_of(
+ vec![Any(0), Variadic(vec![Float64])],
+ Volatility::Volatile,
+ ),
+ }
+ }
+}
+
+impl ScalarUDFImpl for RandomUDF {
+ fn as_any(&self) -> &dyn std::any::Any {
+ self
+ }
+
+ fn name(&self) -> &str {
+ "random_udf"
+ }
+
+ fn signature(&self) -> &Signature {
+ &self.signature
+ }
+
+ fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
+ Ok(Float64)
+ }
+
+ fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
+ let len: usize = match &args[0] {
+ ColumnarValue::Array(array) => array.len(),
+ _ => {
+ return Err(datafusion::error::DataFusionError::Internal(
+ "Invalid argument type".to_string(),
+ ))
+ }
+ };
+ let mut rng = thread_rng();
+ let values = iter::repeat_with(|| rng.gen_range(0.1..1.0)).take(len);
+ let array = Float64Array::from_iter_values(values);
+ Ok(ColumnarValue::Array(Arc::new(array)))
+ }
+}
+
+#[tokio::test]
Review Comment:
I think it would help to add context here on what this test is ensuring
```suggestion
/// Ensure that a volatile user defined function will be invoked for every
row (not once)
#[tokio::test]
```
##########
datafusion/physical-expr/src/scalar_function.rs:
##########
@@ -149,6 +153,11 @@ impl PhysicalExpr for ScalarFunctionExpr {
{
vec![ColumnarValue::create_null_array(batch.num_rows())]
}
+ // If the function supports zero argument, we pass in a null array
indicating the batch size.
Review Comment:
I never fully understood why this didn't just check `self.args.is_empty()` 🤔
##########
datafusion/physical-expr/src/scalar_function.rs:
##########
@@ -58,6 +58,8 @@ pub struct ScalarFunctionExpr {
// and it specifies the effect of an increase or decrease in
// the corresponding `arg` to the function value.
monotonicity: Option<FuncMonotonicity>,
+ // Signature of the function
+ signature: Signature,
Review Comment:
It seems like only one field is ever read. I wonder if it would be better
to copy just this field rather than the entire signature (which is both larger
with several allocations, but also might be misleading that this signature
information was used somehow more in execution plans.
I worry that the signature information might start being referred to in
physical planning
So perhaps something like
```suggestion
// Does this function need to be invoked with zero arguments ?
supports_zero_argument: bool,
```
```rust
self.signature.type_signature.supports_zero_argument
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]