2010YOUY01 opened a new issue, #22710: URL: https://github.com/apache/datafusion/issues/22710
### Is your feature request related to a problem or challenge? \* “Stream” here refers to an `XxxStream` in DataFusion, which implements the per-partition **state machine** for operators such as `AggregateExec`. Conventional wisdom says we should maximize code reuse. In practice, over-applying this principle can lead to code that is difficult to understand, maintain, and extend. `GroupedHashAggregateStream` is a good example. Today it is shared across many **semantically distinct** execution paths. https://github.com/apache/datafusion/blob/5c92390921d8d667aa7cb7d56276a59ba36926f4/datafusion/physical-plan/src/aggregates/row_hash.rs#L358 For example, in multi-stage repartition based hash aggregation, the partial and final aggregation stages have fundamentally different semantics: - Partial aggregation: raw input → partial state - Final aggregation: partial state → final result \* e.g. for avg(x), partial state is sum(x) and count(x) for each group, that is performed in partial stage. Final result means `avg(x)` for each group that directly maps to the output result. There are additional semantic variants, such as partial state → partial state. Beyond that, there are several orthogonal dimensions: - Is the input ordered by the grouping keys? If so, streaming aggregation may be possible. - Does the aggregation exceed the memory budget and require spilling? - Are there specialized fast paths applicable to a particular execution path? As more dimensions are multiplexed into a single implementation, complexity grows combinatorially: ```text Execution Paths = O(semantic_variant) * O(spilling_variant) * O(ordering_variant) * ... ``` At this point, the code looks like a neural network written in Rust: many interacting branches, but no clear separation of responsibilities. ## Issues ### Error-prone The current implementation relies on combinations of flags to determine which execution path is active. This makes **invalid states representable**, increasing the risk of subtle bugs. ### Difficult to test It is nearly impossible to exhaustively test all execution paths and state transitions. As a result, invalid state combinations can easily escape test coverage. ### Difficult to review Review complexity grows with the number of multiplexed dimensions. When reviewing a function, it is often unclear: - Which execution paths reach this code? - Is a change correct for all paths? - Does an optimization for one path introduce regressions in another? Reasoning about correctness becomes increasingly difficult. ### Difficult to extend Performance engineering often requires specialization. There are still several promising optimization opportunities for hash aggregation, but implementing them within the existing structure would further increase complexity, making the code even harder to understand and review. ### Case Study: Blocked State Management I think this is a concrete example of the challenges mentioned above: blocked state management is an important feature for memory-efficient hash aggregation. Despite significant effort from multiple very good contributors, it has still not landed after roughly three years when it was proposed. My interpretation is that the existing implementation has accumulated enough complexity that substantial changes become difficult to design, review, and validate. - https://github.com/apache/datafusion/issues/7065 - https://github.com/apache/datafusion/issues/19649 ## Proposed Solution Split the heavily multiplexed `GroupedHashAggregateStream` into a set of focused streams. Each stream should implement a single semantic execution path and encapsulate its own state machine. I think it addresses all the existing issues mentioned above: - Invalid states become unrepresentable. - Test coverage becomes more targeted and practical. - Review scope becomes significantly smaller. - Specialized optimizations become easier to implement. The tradeoff is some duplication of structs and state machine implementations. However, this is often preferable to concentrating all complexity into a single, highly coupled implementation. ## Implementation Strategy The migration can be performed incrementally. Individual execution paths can be extracted into dedicated streams while leaving the existing implementation unchanged. Once all paths have been migrated, the original implementation can be removed. ```rust AggregateExec::execute() { match self.choose_stream() { PartialMode => build_partial_stream(), FinalMode => build_final_stream(), // ... // Original implementation _ => build_fallback_stream(), } } ``` ## Open Questions This idea may also apply to other operators. For example, joins often contain specialized semantics for semi, anti, and mark joins. Implementing short-circuit optimizations for these join types may be simpler if each variant is represented by its own dedicated state machine rather than being multiplexed into a single implementation. - https://github.com/apache/datafusion/pull/21775 ### Describe the solution you'd like _No response_ ### Describe alternatives you've considered _No response_ ### Additional context _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
