alamb commented on code in PR #7242:
URL: https://github.com/apache/arrow-datafusion/pull/7242#discussion_r1369251483
##########
datafusion/common/src/scalar.rs:
##########
@@ -763,6 +763,42 @@ impl ScalarValue {
}
}
+ // ListArray compatible version of new_primitive
Review Comment:
```suggestion
/// Return a new `ScalarValue::List` given a `Vec` of primitive values
```
##########
datafusion/physical-expr/src/aggregate/sum.rs:
##########
@@ -191,12 +196,35 @@ impl<T: ArrowNumericType> Accumulator for
SumAccumulator<T> {
Ok(vec![self.evaluate()?])
}
+ // There are two kinds of input, PrimitiveArray and ListArray
+ // ListArray is for multiple-rows input, and PrimitiveArray is for
single-row input
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
- let values = values[0].as_primitive::<T>();
- if let Some(x) = sum(values) {
- let v = self.sum.get_or_insert(T::Native::usize_as(0));
- *v = v.add_wrapping(x);
+ // Wrap single-row input into multiple-rows input and use the same
logic as multiple-rows input
+ let list_values = match as_list_array(&values[0]) {
Review Comment:
Can we please not ignore the error here? Each error requires a string
allocation and this is the performance critical inner loop
##########
datafusion/sql/src/expr/function.rs:
##########
@@ -59,6 +60,26 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
if let Ok(fun) = BuiltinScalarFunction::from_str(&name) {
let args =
self.function_args_to_expr(function.args, schema,
planner_context)?;
+
+ // Translate array_aggregate to aggregate function with array
argument.
+ if fun == BuiltinScalarFunction::ArrayAggregate {
Review Comment:
If we do the rewrite here, it will only apply to SQL (so `array_aggregate`
will not work if it is constructed via the dataframe API or an an `Expr`
directly)
##########
datafusion/physical-expr/src/aggregate/sum.rs:
##########
@@ -167,7 +172,7 @@ impl PartialEq<dyn Any> for Sum {
/// This accumulator computes SUM incrementally
struct SumAccumulator<T: ArrowNumericType> {
- sum: Option<T::Native>,
+ sum: Vec<Option<T::Native>>,
Review Comment:
I think this change should have a comment that explains what the `Vec` is
for and how it is uses (as it now supports ListArrays, each element represents
the relative partial sum for the relevant list element, etc
##########
datafusion/common/src/scalar.rs:
##########
@@ -763,6 +763,42 @@ impl ScalarValue {
}
}
+ // ListArray compatible version of new_primitive
+ pub fn new_primitives<T: ArrowPrimitiveType>(
+ values: Vec<Option<T::Native>>,
+ d: &DataType,
+ ) -> Result<Self> {
+ if values.is_empty() {
+ return d.try_into();
+ }
+
+ // We need to convert it to ScalarValue::Primitive (Int64(0)) instead
of ScalarValue::List (List([PrimitiveArray<Int64> [0]]))
+ if values.len() == 1 {
+ return Self::new_primitive::<T>(values[0], d);
Review Comment:
I think I would find this behavior very surprising -- that sometimes it
returns a ScalarValue::List and sometimes a ScalarValue::Int8, etc -- I realize
it is what your changes to sum need, but given it is so specialized, I think it
would make more sense to check for a length of 1 at the callsite
##########
datafusion/physical-expr/src/aggregate/sum.rs:
##########
@@ -291,6 +309,27 @@ pub(crate) fn sum_batch(values: &ArrayRef, sum_type:
&DataType) -> Result<Scalar
DataType::UInt32 => typed_sum_delta_batch!(values, UInt32Array,
UInt32),
DataType::UInt16 => typed_sum_delta_batch!(values, UInt16Array,
UInt16),
DataType::UInt8 => typed_sum_delta_batch!(values, UInt8Array, UInt8),
+ DataType::List(field) => {
+ let array = values.as_list::<i32>();
+ let mut scalars: Vec<ScalarValue> = vec![];
+ for arr in array.iter() {
+ if let Some(arr) = arr {
+ let sum = sum_batch(&arr, field.data_type())?;
+ match sum {
+ ColumnarValue::Scalar(sv) => {
+ scalars.push(sv);
+ }
+ e => {
+ return Err(DataFusionError::Internal(format!(
+ "Sum is not expected to receive the type {e:?}"
+ )));
+ }
+ }
+ }
+ }
+ let arr = ScalarValue::iter_to_array(scalars.into_iter())?;
Review Comment:
so in this case, I was thinking `sum` would be called on the values of the
`list`, not the list itself 🤔 -- could we call `evaluate` on each scalar 🤔
##########
datafusion/common/src/scalar.rs:
##########
@@ -763,6 +763,42 @@ impl ScalarValue {
}
}
+ // ListArray compatible version of new_primitive
+ pub fn new_primitives<T: ArrowPrimitiveType>(
+ values: Vec<Option<T::Native>>,
+ d: &DataType,
+ ) -> Result<Self> {
+ if values.is_empty() {
+ return d.try_into();
+ }
+
+ // We need to convert it to ScalarValue::Primitive (Int64(0)) instead
of ScalarValue::List (List([PrimitiveArray<Int64> [0]]))
Review Comment:
As a suggestion for future PRs, I think comments are best when they describe
the *why* of the code (rather than the what) -- in this case I think the why is
because this function is doing something very special depending on what mode
the accumulator is in.
##########
datafusion/physical-expr/src/aggregate/sum.rs:
##########
@@ -167,7 +172,7 @@ impl PartialEq<dyn Any> for Sum {
/// This accumulator computes SUM incrementally
Review Comment:
I am very worried about changing how the Sum accumulator works, as the Sum
accumulator is one of the performance critical aggregators.
While it would result in duplicated code, I think in this case it would make
sense to have a separate accumulator implementation for `array_sum` given this
concern
##########
datafusion/physical-expr/src/aggregate/sum.rs:
##########
@@ -291,6 +309,27 @@ pub(crate) fn sum_batch(values: &ArrayRef, sum_type:
&DataType) -> Result<Scalar
DataType::UInt32 => typed_sum_delta_batch!(values, UInt32Array,
UInt32),
DataType::UInt16 => typed_sum_delta_batch!(values, UInt16Array,
UInt16),
DataType::UInt8 => typed_sum_delta_batch!(values, UInt8Array, UInt8),
+ DataType::List(field) => {
+ let array = values.as_list::<i32>();
+ let mut scalars: Vec<ScalarValue> = vec![];
+ for arr in array.iter() {
+ if let Some(arr) = arr {
+ let sum = sum_batch(&arr, field.data_type())?;
+ match sum {
+ ColumnarValue::Scalar(sv) => {
+ scalars.push(sv);
+ }
+ e => {
+ return Err(DataFusionError::Internal(format!(
+ "Sum is not expected to receive the type {e:?}"
+ )));
+ }
+ }
+ }
+ }
+ let arr = ScalarValue::iter_to_array(scalars.into_iter())?;
Review Comment:
so in this case, I was thinking `sum` would be called on the values of the
`list`, not the list itself 🤔 -- could we call `evaluate` on each scalar 🤔
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]