korowa commented on code in PR #7385:
URL: https://github.com/apache/arrow-datafusion/pull/7385#discussion_r1303349330
##########
datafusion/physical-expr/src/aggregate/array_agg_distinct.rs:
##########
@@ -195,24 +227,34 @@ mod tests {
));
let actual = aggregate(&batch, agg)?;
- match (expected, actual) {
- (ScalarValue::List(Some(mut e), _), ScalarValue::List(Some(mut a),
_)) => {
- // workaround lack of Ord of ScalarValue
- let cmp = |a: &ScalarValue, b: &ScalarValue| {
- a.partial_cmp(b).expect("Can compare ScalarValues")
- };
+ compare_list_contents(expected, actual)
+ }
- e.sort_by(cmp);
- a.sort_by(cmp);
- // Check that the inputs are the same
- assert_eq!(e, a);
- }
- _ => {
- unreachable!()
- }
- }
+ fn check_merge_distinct_array_agg(
+ input1: ArrayRef,
+ input2: ArrayRef,
+ expected: ScalarValue,
+ datatype: DataType,
+ ) -> Result<()> {
+ let schema = Schema::new(vec![Field::new("a", datatype.clone(),
false)]);
+ let agg = Arc::new(DistinctArrayAgg::new(
+ col("a", &schema)?,
+ "bla".to_string(),
+ datatype,
+ ));
- Ok(())
+ let mut accum1 = agg.create_accumulator()?;
+ let mut accum2 = agg.create_accumulator()?;
+
+ accum1.update_batch(&[input1])?;
+ accum2.update_batch(&[input2])?;
+
+ let state = get_accum_scalar_values_as_arrays(accum2.as_ref())?;
Review Comment:
True, but 2 is enough for `merge_batch` logic validation.
Regarding `AggregateStream` and these tests behaviour:
1) single `AggregateExec` and stream(s) originated from it can have multiple
accumulators -- depending on how much aggregate expressions are used in query
or subquery
2) during `aggregate_batch` all accumulators are respectively updated in a
loop
3) aggregation can be performed in two stages -- at first stage (`Partial`
aggregation mode) each accumulator is updated by calling `update_batch` with
`AggregateExec` input data, and as a result partial aggregation there is
produced stream of accumulators intermediate states -- output stream is
generated by
[finalize_aggregation](https://github.com/apache/arrow-datafusion/blob/3ffeb52c1c9891e63fcb17db41283d7299af6f18/datafusion/core/src/physical_plan/aggregates/mod.rs#L1075)
4) the second stage -- final aggregation updates each accumulator with
`merge_batch`, using stream of states from previous step as an input, and
produces stream of `accumulator.evaluate()` results (also as a result of
calling finalize_aggregation with `Final` mode)
This test basically reproduces this behaviour of aggregation for the case
with single accumulator in `AggregateExec` (as there is only one aggregate
expression), and accum1/accum2 variables represent different input streams (not
different expressions):
1) partial aggregation is emulated by `update_batch` on each of two
accumulators -- as if they were aggregating different input partitions
2) then accum2 state is collected by `get_accum_scalar_values_as_arrays` (it
also returns `Vec<ArrayRef>`), and emulates `finalize_aggregation` call for
`Partial` mode
3) the state from step 2 merged into first accumulator using `merge_batch`
-- as is would work in case of `Final` aggregation mode (there would be third
accumulator, but 2 is still enough for testing purposes)
4) finally result produced by `accum1.evaluate()` over merged state compared
to expected value
Hope, I got your question correct, and this comment answers it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]