This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new f13225920 Allow specialized implementations to produce hints for the
array adapter (#3765)
f13225920 is described below
commit f1322592046a7ea895820286a2ad16d934540d92
Author: Batuhan Taskaya <[email protected]>
AuthorDate: Tue Oct 11 23:31:02 2022 +0300
Allow specialized implementations to produce hints for the array adapter
(#3765)
---
datafusion/physical-expr/src/functions.rs | 190 ++++++++++++++++++++--
datafusion/physical-expr/src/regex_expressions.rs | 21 ++-
2 files changed, 190 insertions(+), 21 deletions(-)
diff --git a/datafusion/physical-expr/src/functions.rs
b/datafusion/physical-expr/src/functions.rs
index 7d9e89b52..6f7b864ef 100644
--- a/datafusion/physical-expr/src/functions.rs
+++ b/datafusion/physical-expr/src/functions.rs
@@ -247,9 +247,34 @@ macro_rules! invoke_if_unicode_expressions_feature_flag {
};
}
+#[derive(Debug, Clone, Copy)]
+pub(crate) enum Hint {
+ /// Indicates the argument needs to be padded if it is scalar
+ Pad,
+ /// Indicates the argument can be converted to an array of length 1
+ AcceptsSingular,
+}
+
/// decorates a function to handle [`ScalarValue`]s by converting them to
arrays before calling the function
/// and vice-versa after evaluation.
pub fn make_scalar_function<F>(inner: F) -> ScalarFunctionImplementation
+where
+ F: Fn(&[ArrayRef]) -> Result<ArrayRef> + Sync + Send + 'static,
+{
+ make_scalar_function_with_hints(inner, vec![])
+}
+
+/// Just like [`make_scalar_function`], decorates the given function to handle
both [`ScalarValue`]s and arrays.
+/// Additionally can receive a `hints` vector which can be used to control the
output arrays when generating them
+/// from [`ScalarValue`]s.
+///
+/// Each element of the `hints` vector gets mapped to the corresponding
argument of the function. The number of hints
+/// can be less or greater than the number of arguments (for functions with
variable number of arguments). Each unmapped
+/// argument will assume the default hint (for padding, it is [`Hint::Pad`]).
+pub(crate) fn make_scalar_function_with_hints<F>(
+ inner: F,
+ hints: Vec<Hint>,
+) -> ScalarFunctionImplementation
where
F: Fn(&[ArrayRef]) -> Result<ArrayRef> + Sync + Send + 'static,
{
@@ -263,16 +288,20 @@ where
ColumnarValue::Array(a) => Some(a.len()),
});
- // to array
- let args = if let Some(len) = len {
- args.iter()
- .map(|arg| arg.clone().into_array(len))
- .collect::<Vec<ArrayRef>>()
- } else {
- args.iter()
- .map(|arg| arg.clone().into_array(1))
- .collect::<Vec<ArrayRef>>()
- };
+ let inferred_length = len.unwrap_or(1);
+ let args = args
+ .iter()
+ .zip(hints.iter().chain(std::iter::repeat(&Hint::Pad)))
+ .map(|(arg, hint)| {
+ // Decide on the length to expand this scalar to depending
+ // on the given hints.
+ let expansion_len = match hint {
+ Hint::AcceptsSingular => 1,
+ Hint::Pad => inferred_length,
+ };
+ arg.clone().into_array(expansion_len)
+ })
+ .collect::<Vec<ArrayRef>>();
let result = (inner)(&args);
@@ -2887,4 +2916,145 @@ mod tests {
coerce(input_phy_exprs, input_schema,
&function::signature(fun)).unwrap();
create_physical_expr(fun, &type_coerced_phy_exprs, input_schema,
execution_props)
}
+
+ fn dummy_function(args: &[ArrayRef]) -> Result<ArrayRef> {
+ let result: UInt64Array =
+ args.iter().map(|array| Some(array.len() as u64)).collect();
+ Ok(Arc::new(result) as ArrayRef)
+ }
+
+ fn unpack_uint64_array(col: Result<ColumnarValue>) -> Result<Vec<u64>> {
+ match col? {
+ ColumnarValue::Array(array) => Ok(array
+ .as_any()
+ .downcast_ref::<UInt64Array>()
+ .unwrap()
+ .values()
+ .to_vec()),
+ ColumnarValue::Scalar(_) => Err(DataFusionError::Internal(
+ "Unexpected scalar created by a test function".to_string(),
+ )),
+ }
+ }
+
+ #[test]
+ fn test_make_scalar_function() -> Result<()> {
+ let adapter_func = make_scalar_function(dummy_function);
+
+ let scalar_arg = ColumnarValue::Scalar(ScalarValue::Int64(Some(1)));
+ let array_arg =
+
ColumnarValue::Array(ScalarValue::Int64(Some(1)).to_array_of_size(5));
+ let result = unpack_uint64_array(adapter_func(&[array_arg,
scalar_arg]))?;
+ assert_eq!(result, vec![5, 5]);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_make_scalar_function_with_no_hints() -> Result<()> {
+ let adapter_func = make_scalar_function_with_hints(dummy_function,
vec![]);
+
+ let scalar_arg = ColumnarValue::Scalar(ScalarValue::Int64(Some(1)));
+ let array_arg =
+
ColumnarValue::Array(ScalarValue::Int64(Some(1)).to_array_of_size(5));
+ let result = unpack_uint64_array(adapter_func(&[array_arg,
scalar_arg]))?;
+ assert_eq!(result, vec![5, 5]);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_make_scalar_function_with_hints() -> Result<()> {
+ let adapter_func = make_scalar_function_with_hints(
+ dummy_function,
+ vec![Hint::Pad, Hint::AcceptsSingular],
+ );
+
+ let scalar_arg = ColumnarValue::Scalar(ScalarValue::Int64(Some(1)));
+ let array_arg =
+
ColumnarValue::Array(ScalarValue::Int64(Some(1)).to_array_of_size(5));
+ let result = unpack_uint64_array(adapter_func(&[array_arg,
scalar_arg]))?;
+ assert_eq!(result, vec![5, 1]);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_make_scalar_function_with_hints_on_arrays() -> Result<()> {
+ let array_arg =
+
ColumnarValue::Array(ScalarValue::Int64(Some(1)).to_array_of_size(5));
+ let adapter_func = make_scalar_function_with_hints(
+ dummy_function,
+ vec![Hint::Pad, Hint::AcceptsSingular],
+ );
+
+ let result = unpack_uint64_array(adapter_func(&[array_arg.clone(),
array_arg]))?;
+ assert_eq!(result, vec![5, 5]);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_make_scalar_function_with_mixed_hints() -> Result<()> {
+ let adapter_func = make_scalar_function_with_hints(
+ dummy_function,
+ vec![Hint::Pad, Hint::AcceptsSingular, Hint::Pad],
+ );
+
+ let scalar_arg = ColumnarValue::Scalar(ScalarValue::Int64(Some(1)));
+ let array_arg =
+
ColumnarValue::Array(ScalarValue::Int64(Some(1)).to_array_of_size(5));
+ let result = unpack_uint64_array(adapter_func(&[
+ array_arg,
+ scalar_arg.clone(),
+ scalar_arg,
+ ]))?;
+ assert_eq!(result, vec![5, 1, 5]);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_make_scalar_function_with_more_arguments_than_hints() ->
Result<()> {
+ let adapter_func = make_scalar_function_with_hints(
+ dummy_function,
+ vec![Hint::Pad, Hint::AcceptsSingular, Hint::Pad],
+ );
+
+ let scalar_arg = ColumnarValue::Scalar(ScalarValue::Int64(Some(1)));
+ let array_arg =
+
ColumnarValue::Array(ScalarValue::Int64(Some(1)).to_array_of_size(5));
+ let result = unpack_uint64_array(adapter_func(&[
+ array_arg.clone(),
+ scalar_arg.clone(),
+ scalar_arg,
+ array_arg,
+ ]))?;
+ assert_eq!(result, vec![5, 1, 5, 5]);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_make_scalar_function_with_hints_than_arguments() -> Result<()> {
+ let adapter_func = make_scalar_function_with_hints(
+ dummy_function,
+ vec![
+ Hint::Pad,
+ Hint::AcceptsSingular,
+ Hint::Pad,
+ Hint::Pad,
+ Hint::AcceptsSingular,
+ Hint::Pad,
+ ],
+ );
+
+ let scalar_arg = ColumnarValue::Scalar(ScalarValue::Int64(Some(1)));
+ let array_arg =
+
ColumnarValue::Array(ScalarValue::Int64(Some(1)).to_array_of_size(5));
+ let result = unpack_uint64_array(adapter_func(&[array_arg,
scalar_arg]))?;
+ assert_eq!(result, vec![5, 1]);
+
+ Ok(())
+ }
}
diff --git a/datafusion/physical-expr/src/regex_expressions.rs
b/datafusion/physical-expr/src/regex_expressions.rs
index 6584b1356..d7edc3400 100644
--- a/datafusion/physical-expr/src/regex_expressions.rs
+++ b/datafusion/physical-expr/src/regex_expressions.rs
@@ -33,7 +33,7 @@ use regex::Regex;
use std::any::type_name;
use std::sync::Arc;
-use crate::functions::make_scalar_function;
+use crate::functions::{make_scalar_function, make_scalar_function_with_hints,
Hint};
/// Get the first argument from the given string array.
///
@@ -300,16 +300,15 @@ pub fn specialize_regexp_replace<T: OffsetSizeTrait>(
// we will create many regexes and it is best to use the implementation
// that caches it. If there are no flags, we can simply ignore it here,
// and let the specialized function handle it.
- (_, true, true, true) => {
- // We still don't know the scalarity of source, so we need the
adapter
- // even if it will do some extra work for the pattern and the
flags.
- //
- // TODO: maybe we need a way of telling the adapter on which
arguments
- // it can skip filling (so that we won't create N - 1 redundant
cols).
- Ok(make_scalar_function(
- _regexp_replace_static_pattern_replace::<T>,
- ))
- }
+ (_, true, true, true) => Ok(make_scalar_function_with_hints(
+ _regexp_replace_static_pattern_replace::<T>,
+ vec![
+ Hint::Pad,
+ Hint::AcceptsSingular,
+ Hint::AcceptsSingular,
+ Hint::AcceptsSingular,
+ ],
+ )),
// If there are no specialized implementations, we'll fall back to the
// generic implementation.