geoffreyclaude opened a new pull request, #19669:
URL: https://github.com/apache/datafusion/pull/19669
## Which issue does this PR close?
- N/A
## Rationale for this change
This change addresses a failure in the `CORR` aggregate function when
running in streaming mode. The `CorrelationGroupsAccumulator` was failing to
drain its state vectors during `EmitTo::First` calls, causing internal state to
persist across emissions. This led to memory leaks, incorrect results for
subsequent groups, and "length mismatch" errors because the internal vector
sizes diverged from the number of emitted groups.
### Reproducer
```sql
SELECT bucket, grp, CORR(x, y) FROM (
SELECT * FROM (VALUES
(1, 1, 1.0, 1.0), (1, 1, 2.0, 2.0),
(1, 2, 1.0, 2.0), (1, 2, 2.0, 1.0),
(1, 3, 1.0, 1.0), (1, 3, 2.0, 2.0),
(1, 4, 1.0, 2.0), (1, 4, 2.0, 1.0),
(2, 1, 1.0, 5.0), (2, 1, 2.0, 5.0),
(2, 2, 1.0, 5.0), (2, 2, 2.0, 5.0),
(2, 3, 1.0, 5.0), (2, 3, 2.0, 5.0),
(2, 4, 1.0, 5.0), (2, 4, 2.0, 5.0)
) AS t(bucket, grp, x, y)
ORDER BY bucket
LIMIT 1000000
) AS ordered_data
GROUP BY bucket, grp
ORDER BY bucket, grp;
```
**Before**: `DataFusion error: Arrow error: Invalid argument error: all
columns in a record batch must have the same length`
**After**:
```
1 1 1
1 2 -1
1 3 1
1 4 -1
2 1 NULL
2 2 NULL
2 3 NULL
2 4 NULL
```
## What changes are included in this PR?
This PR is structured into two commits: the first adds a failing test case
to demonstrate the issue, and the second implements the fix.
The accumulator now uses `emit_to.take_needed()` in both `evaluate` and
`state` to properly consume the emitted portions of the state vectors.
Additionally, the `size()` implementation has been updated to use vector
capacity for more accurate memory accounting.
## Are these changes tested?
Yes, a new test case in `aggregate.slt` triggers streaming aggregation via
an ordered subquery. This test previously crashed with an Arrow length mismatch
error and now produces correct results.
## Are there any user-facing changes?
Yes, SQL queries that trigger streaming aggregation using `CORR` (typically
those with specific ordering requirements) will now succeed instead of failing
with a length mismatch error.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]