korowa commented on code in PR #13995:
URL: https://github.com/apache/datafusion/pull/13995#discussion_r1903949142
##########
datafusion/core/src/dataframe/mod.rs:
##########
@@ -2743,6 +2754,143 @@ mod tests {
Ok(())
}
+ // test for https://github.com/apache/datafusion/issues/13949
+ async fn run_test_with_spill_pool_if_necessary(
Review Comment:
I suppose it'll be better to move this test to other aggregate tests in
`datafusion/physical-plan/src/mod.rs`
##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -522,7 +527,7 @@ impl GroupedHashAggregateStream {
let spill_state = SpillState {
spills: vec![],
spill_expr,
- spill_schema: Arc::clone(&agg_schema),
+ spill_schema: partial_agg_schema,
Review Comment:
It seems like the issue was related only to
`AggregateMode::Single[Partitioned]` cases, since for both Final and
FinalPartitioned, there is a
[reassignment](https://github.com/apache/datafusion/blob/487b952cf1a748cc79724638f13e66761a6665e2/datafusion/physical-plan/src/aggregates/row_hash.rs#L969)
right before spilling (the new value is a schema for Partial output which is
exactly group_by + state fields). Perhaps we can remove this reassignment now
and rely on original spill_schema value set on stream creation (before removing
it, we need to ensure that spill schema will be equal to intermediate result
schema for any aggregation mode which supports spilling)?
##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -802,6 +807,45 @@ impl RecordBatchStream for GroupedHashAggregateStream {
}
}
+// fix https://github.com/apache/datafusion/issues/13949
+/// Builds a **partial aggregation** schema by combining the group columns and
+/// the accumulator state columns produced by each aggregate expression.
+///
+/// # Why Partial Aggregation Schema Is Needed
+///
+/// In a multi-stage (partial/final) aggregation strategy, each
partial-aggregate
+/// operator produces *intermediate* states (e.g., partial sums, counts) rather
+/// than final scalar values. These extra columns do **not** exist in the
original
+/// input schema (which may be something like `[colA, colB, ...]`). Instead,
+/// each aggregator adds its own internal state columns (e.g., `[acc_state_1,
acc_state_2, ...]`).
+///
+/// Therefore, when we spill these intermediate states or pass them to another
+/// aggregation operator, we must use a schema that includes both the group
+/// columns **and** the partial-state columns. Otherwise, using the original
input
+/// schema to read partial states will result in a column-count mismatch error.
+///
+/// This helper function constructs such a schema:
+/// `[group_col_1, group_col_2, ..., state_col_1, state_col_2, ...]`
+/// so that partial aggregation data can be handled consistently.
+fn build_partial_agg_schema(
Review Comment:
Perhaps instead of the new helper we could reuse
[aggregates::create_schema](https://github.com/apache/datafusion/blob/487b952cf1a748cc79724638f13e66761a6665e2/datafusion/physical-plan/src/aggregates/mod.rs#L895)?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]