metesynnada commented on code in PR #7385:
URL: https://github.com/apache/arrow-datafusion/pull/7385#discussion_r1303147504
##########
datafusion/sqllogictest/test_files/aggregate.slt:
##########
@@ -1280,6 +1280,10 @@ NULL NULL 781 7.81 125 -117 100
# ----
# [4, 2, 3, 5, 1]
+# additional count(1) forces array_agg_distinct instead of array_agg over
aggregated by c2 data
+statement ok
+SELECT array_agg(distinct c2), count(1) FROM aggregate_test_100
Review Comment:
Can you verify the output as well?
##########
datafusion/physical-expr/src/aggregate/array_agg_distinct.rs:
##########
@@ -195,24 +227,34 @@ mod tests {
));
let actual = aggregate(&batch, agg)?;
- match (expected, actual) {
- (ScalarValue::List(Some(mut e), _), ScalarValue::List(Some(mut a),
_)) => {
- // workaround lack of Ord of ScalarValue
- let cmp = |a: &ScalarValue, b: &ScalarValue| {
- a.partial_cmp(b).expect("Can compare ScalarValues")
- };
+ compare_list_contents(expected, actual)
+ }
- e.sort_by(cmp);
- a.sort_by(cmp);
- // Check that the inputs are the same
- assert_eq!(e, a);
- }
- _ => {
- unreachable!()
- }
- }
+ fn check_merge_distinct_array_agg(
+ input1: ArrayRef,
+ input2: ArrayRef,
+ expected: ScalarValue,
+ datatype: DataType,
+ ) -> Result<()> {
+ let schema = Schema::new(vec![Field::new("a", datatype.clone(),
false)]);
+ let agg = Arc::new(DistinctArrayAgg::new(
+ col("a", &schema)?,
+ "bla".to_string(),
+ datatype,
+ ));
- Ok(())
+ let mut accum1 = agg.create_accumulator()?;
+ let mut accum2 = agg.create_accumulator()?;
+
+ accum1.update_batch(&[input1])?;
+ accum2.update_batch(&[input2])?;
+
+ let state = get_accum_scalar_values_as_arrays(accum2.as_ref())?;
Review Comment:
I think there can be more than 2 accumulators. However, the thing I do not
understand is that `AggregateStream` calls `aggregate_batch` with multiple
states involve, like
```rust
let values = &expr
.iter()
.map(|e| e.evaluate(&batch))
.map(|r| r.map(|v| v.into_array(batch.num_rows())))
.collect::<Result<Vec<_>>>()?;
// 1.4
let size_pre = accum.size();
let res = match mode {
AggregateMode::Partial
| AggregateMode::Single
| AggregateMode::SinglePartitioned => accum.update_batch(values),
AggregateMode::Final | AggregateMode::FinalPartitioned => {
accum.merge_batch(values)
}
};
```
where `values` is a `Vec<ArrayRef>`, but this test seems to take a different
approach. Can you enlighten me about the procedure?
##########
datafusion/physical-expr/src/aggregate/array_agg_distinct.rs:
##########
@@ -147,11 +146,21 @@ impl Accumulator for DistinctArrayAggAccumulator {
return Ok(());
}
- for array in states {
- for j in 0..array.len() {
- self.values.insert(ScalarValue::try_from_array(array, j)?);
+ assert!(
+ states.len() == 1,
+ "array_agg_distinct states must contain single array"
+ );
+
+ let state = &states[0];
+ (0..state.len()).try_for_each(|i| {
+ let scalar = ScalarValue::try_from_array(state, i)?;
Review Comment:
```suggestion
let array = &states[0];
(0..array.len()).try_for_each(|i| {
let scalar = ScalarValue::try_from_array(array, i)?;
```
##########
datafusion/physical-expr/src/aggregate/array_agg_distinct.rs:
##########
@@ -147,11 +146,21 @@ impl Accumulator for DistinctArrayAggAccumulator {
return Ok(());
}
- for array in states {
- for j in 0..array.len() {
- self.values.insert(ScalarValue::try_from_array(array, j)?);
+ assert!(
+ states.len() == 1,
+ "array_agg_distinct states must contain single array"
+ );
Review Comment:
```suggestion
assert_eq!(
states.len(),
1,
"array_agg_distinct states must contain single array"
);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]