2010YOUY01 opened a new issue, #22710:
URL: https://github.com/apache/datafusion/issues/22710

   ### Is your feature request related to a problem or challenge?
   
   \* “Stream” here refers to an `XxxStream` in DataFusion, which implements 
the per-partition **state machine** for operators such as `AggregateExec`.
   
   Conventional wisdom says we should maximize code reuse. In practice, 
over-applying this principle can lead to code that is difficult to understand, 
maintain, and extend.
   
   `GroupedHashAggregateStream` is a good example. Today it is shared across 
many **semantically distinct** execution paths.
   
https://github.com/apache/datafusion/blob/5c92390921d8d667aa7cb7d56276a59ba36926f4/datafusion/physical-plan/src/aggregates/row_hash.rs#L358
   
   
   For example, in multi-stage repartition based hash aggregation, the partial 
and final aggregation stages have fundamentally different semantics:
   
   - Partial aggregation: raw input → partial state
   - Final aggregation: partial state → final result
   
   \* e.g. for avg(x), partial state is sum(x) and count(x) for each group, 
that is performed in partial stage. Final result means `avg(x)` for each group 
that directly maps to the output result.
   
   There are additional semantic variants, such as partial state → partial 
state. Beyond that, there are several orthogonal dimensions:
   
   - Is the input ordered by the grouping keys? If so, streaming aggregation 
may be possible.
   - Does the aggregation exceed the memory budget and require spilling?
   - Are there specialized fast paths applicable to a particular execution path?
   
   As more dimensions are multiplexed into a single implementation, complexity 
grows combinatorially:
   
   ```text
   Execution Paths =
       O(semantic_variant)
     * O(spilling_variant)
     * O(ordering_variant)
     * ...
   ```
   
   At this point, the code looks like a neural network written in Rust: many 
interacting branches, but no clear separation of responsibilities.
   
   ## Issues
   
   ### Error-prone
   
   The current implementation relies on combinations of flags to determine 
which execution path is active. This makes **invalid states representable**, 
increasing the risk of subtle bugs.
   
   ### Difficult to test
   
   It is nearly impossible to exhaustively test all execution paths and state 
transitions. As a result, invalid state combinations can easily escape test 
coverage.
   
   ### Difficult to review
   
   Review complexity grows with the number of multiplexed dimensions.
   
   When reviewing a function, it is often unclear:
   - Which execution paths reach this code?
   - Is a change correct for all paths?
   - Does an optimization for one path introduce regressions in another?
   
   Reasoning about correctness becomes increasingly difficult.
   
   ### Difficult to extend
   
   Performance engineering often requires specialization.
   
   There are still several promising optimization opportunities for hash 
aggregation, but implementing them within the existing structure would further 
increase complexity, making the code even harder to understand and review.
   
   ### Case Study: Blocked State Management
   I think this is a concrete example of the challenges mentioned above: 
blocked state management is an important feature for memory-efficient hash 
aggregation. Despite significant effort from multiple very good contributors, 
it has still not landed after roughly three years when it was proposed.
   
   My interpretation is that the existing implementation has accumulated enough 
complexity that substantial changes become difficult to design, review, and 
validate.
   
   - https://github.com/apache/datafusion/issues/7065
   - https://github.com/apache/datafusion/issues/19649
   
   ## Proposed Solution
   
   Split the heavily multiplexed `GroupedHashAggregateStream` into a set of 
focused streams.
   
   Each stream should implement a single semantic execution path and 
encapsulate its own state machine.
   
   I think it addresses all the existing issues mentioned above:
   - Invalid states become unrepresentable.
   - Test coverage becomes more targeted and practical.
   - Review scope becomes significantly smaller.
   - Specialized optimizations become easier to implement.
   
   The tradeoff is some duplication of structs and state machine 
implementations. However, this is often preferable to concentrating all 
complexity into a single, highly coupled implementation.
   
   ## Implementation Strategy
   
   The migration can be performed incrementally.
   
   Individual execution paths can be extracted into dedicated streams while 
leaving the existing implementation unchanged. Once all paths have been 
migrated, the original implementation can be removed.
   
   ```rust
   AggregateExec::execute() {
       match self.choose_stream() {
           PartialMode => build_partial_stream(),
           FinalMode => build_final_stream(),
           // ...
   
           // Original implementation
           _ => build_fallback_stream(),
       }
   }
   ```
   
   ## Open Questions
   
   This idea may also apply to other operators.
   
   For example, joins often contain specialized semantics for semi, anti, and 
mark joins. Implementing short-circuit optimizations for these join types may 
be simpler if each variant is represented by its own dedicated state machine 
rather than being multiplexed into a single implementation.
   
   - https://github.com/apache/datafusion/pull/21775
   
   ### Describe the solution you'd like
   
   _No response_
   
   ### Describe alternatives you've considered
   
   _No response_
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to