korowa commented on code in PR #7385:
URL: https://github.com/apache/arrow-datafusion/pull/7385#discussion_r1303349330


##########
datafusion/physical-expr/src/aggregate/array_agg_distinct.rs:
##########
@@ -195,24 +227,34 @@ mod tests {
         ));
         let actual = aggregate(&batch, agg)?;
 
-        match (expected, actual) {
-            (ScalarValue::List(Some(mut e), _), ScalarValue::List(Some(mut a), 
_)) => {
-                // workaround lack of Ord of ScalarValue
-                let cmp = |a: &ScalarValue, b: &ScalarValue| {
-                    a.partial_cmp(b).expect("Can compare ScalarValues")
-                };
+        compare_list_contents(expected, actual)
+    }
 
-                e.sort_by(cmp);
-                a.sort_by(cmp);
-                // Check that the inputs are the same
-                assert_eq!(e, a);
-            }
-            _ => {
-                unreachable!()
-            }
-        }
+    fn check_merge_distinct_array_agg(
+        input1: ArrayRef,
+        input2: ArrayRef,
+        expected: ScalarValue,
+        datatype: DataType,
+    ) -> Result<()> {
+        let schema = Schema::new(vec![Field::new("a", datatype.clone(), 
false)]);
+        let agg = Arc::new(DistinctArrayAgg::new(
+            col("a", &schema)?,
+            "bla".to_string(),
+            datatype,
+        ));
 
-        Ok(())
+        let mut accum1 = agg.create_accumulator()?;
+        let mut accum2 = agg.create_accumulator()?;
+
+        accum1.update_batch(&[input1])?;
+        accum2.update_batch(&[input2])?;
+
+        let state = get_accum_scalar_values_as_arrays(accum2.as_ref())?;

Review Comment:
   True, but 2 is enough for `merge_batch` logic validation.
   
   Regarding `AggregateStream` and these tests behaviour:
   1) single `AggregateExec` and stream(s) originated from it can have multiple 
accumulators -- depending on how much aggregate expressions are used in query 
or subquery
   2) during `aggregate_batch` all accumulators are respectively updated in a 
loop
   3) aggregation can be performed in two stages -- at first stage (`Partial` 
aggregation mode) each accumulator is updated by calling `update_batch` with 
`AggregateExec` input data, and as a result partial aggregation there is 
produced stream of accumulators intermediate states -- output stream is 
generated by 
[finalize_aggregation](https://github.com/apache/arrow-datafusion/blob/3ffeb52c1c9891e63fcb17db41283d7299af6f18/datafusion/core/src/physical_plan/aggregates/mod.rs#L1075)
   4) the second stage -- final aggregation updates each accumulator with 
`merge_batch`, using stream of states from previous step as an input, and 
produces stream of `accumulator.evaluate()` results (also as a result of 
calling finalize_aggregation with `Final` mode)
   
   This test basically reproduces this behaviour of aggregation for the case 
with single accumulator in `AggregateExec` (as there is only one aggregate 
expression), and accum1/accum2 variables represent different input streams (not 
different expressions):
   1) partial aggregation is emulated by `update_batch` on each of two 
accumulators -- as if they were aggregating different input partitions
   2) then accum2 state is collected by `get_accum_scalar_values_as_arrays` (it 
also returns `Vec<ArrayRef>`), and emulates `finalize_aggregation` call for 
`Partial` mode
   3) the state from step 2 merged into first accumulator using `merge_batch` 
-- as is would work in case of `Final` aggregation mode (there would be third 
accumulator, but 2 is still enough for testing purposes)
   4) finally result produced by `accum1.evaluate()` over merged state compared 
to expected value
   
   Hope, I got your question correct, and this comment answers it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to