thinkharderdev opened a new issue, #11291:
URL: https://github.com/apache/datafusion/issues/11291
### Describe the bug
The `InputOrderMode` in `AggregateExec` does not take proper account of
grouping sets and can cause incorrect aggregation results in cases where we
have a constant expression as a grouping set
The underlying issue seems to be that since we consider constant expression
to be trivially ordered (true) we assume that we can emit values when a new
value of the const expr is witnessed. This doesn't work for grouping sets
though as they are expanded with a `null` value for the group key. This causes
the groups to be emitted incorrectly and ultimately in invalid aggregation
results.
This would happen anytime you have a query like
```
SELECT a, b, c, count(1) FROM table
GROUP BY GROUPING SETS ((1 AS a), (b), (c))
```
### To Reproduce
Simplest test I could come up with to reproduce the issue:
```
#[tokio::test]
async fn test_agg_exec_group_by_const() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Float32, true),
Field::new("b", DataType::Float32, true),
Field::new("const", DataType::Int32, false),
]));
let col_a = col("a", &schema)?;
let col_b = col("b", &schema)?;
let const_expr = Arc::new(Literal::new(ScalarValue::Int32(Some(1))));
let groups = PhysicalGroupBy::new(
vec![
(col_a, "a".to_string()),
(col_b, "b".to_string()),
(const_expr, "const".to_string()),
],
vec![
(
Arc::new(Literal::new(ScalarValue::Float32(None))),
"a".to_string(),
),
(
Arc::new(Literal::new(ScalarValue::Float32(None))),
"b".to_string(),
),
(
Arc::new(Literal::new(ScalarValue::Int32(None))),
"const".to_string(),
),
],
vec![
vec![true, false, false],
vec![false, true, false],
vec![false, false, true],
],
);
let aggregates: Vec<Arc<dyn AggregateExpr>> =
vec![create_aggregate_expr(
count_udaf().as_ref(),
&[lit(1)],
&[Expr::Literal(ScalarValue::Int32(Some(1)))],
&[],
&[],
schema.as_ref(),
"1",
false,
false,
)?];
let input_batches = (0..4)
.map(|_| {
let a = Arc::new(Float32Array::from(vec![0.; 8192]));
let b = Arc::new(Float32Array::from(vec![0.; 8192]));
let c = Arc::new(Int32Array::from(vec![1; 8192]));
RecordBatch::try_new(schema.clone(), vec![a, b, c]).unwrap()
})
.collect();
let input =
Arc::new(MemoryExec::try_new(&[input_batches], schema.clone(),
None)?);
let aggregate_exec = Arc::new(AggregateExec::try_new(
AggregateMode::Partial,
groups,
aggregates.clone(),
vec![None],
input,
schema,
)?);
let output =
collect(aggregate_exec.execute(0,
Arc::new(TaskContext::default()))?).await?;
let expected = vec![
"+-----+-----+-------+----------+",
"| a | b | const | 1[count] |",
"+-----+-----+-------+----------+",
"| | 0.0 | | 32768 |",
"| 0.0 | | | 32768 |",
"| | | 1 | 32768 |",
"+-----+-----+-------+----------+",
];
assert_batches_sorted_eq!(expected, &output);
assert!(false);
Ok(())
}
```
On `main` this fails like
```
expected:
[
"+-----+-----+-------+----------+",
"| a | b | const | 1[count] |",
"+-----+-----+-------+----------+",
"| | | 1 | 32768 |",
"| | 0.0 | | 32768 |",
"| 0.0 | | | 32768 |",
"+-----+-----+-------+----------+",
]
actual:
[
"+-----+-----+-------+----------+",
"| a | b | const | 1[count] |",
"+-----+-----+-------+----------+",
"| | 0.0 | 1 | 16384 |",
"| | 0.0 | 1 | 8192 |",
"| | 0.0 | 1 | 8192 |",
"| 0.0 | | 1 | 16384 |",
"| 0.0 | | 1 | 8192 |",
"| 0.0 | | 1 | 8192 |",
"| 0.0 | 0.0 | | 16384 |",
"| 0.0 | 0.0 | | 16384 |",
"+-----+-----+-------+----------+",
]
```
### Expected behavior
It should work :)
### Additional context
_No response_
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]