avantgardnerio commented on issue #23194: URL: https://github.com/apache/datafusion/issues/23194#issuecomment-4858814434
@comphead if you click through into the (working) [reference PoC](https://github.com/apache/datafusion/pull/23167), you'll find numbered stages: ``` 01)RuntimeOptimizerExec 02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, group_key@0)], projection=[group_key@2, sum_payload@3, payload@1] 03)----CoalescePartitionsExec 04)------DataSourceExec: partitions=4, partition_sizes=[1, 0, 0, 0] 05)----ProjectionExec: expr=[group_key@0 as group_key, sum(big.payload)@1 as sum_payload] 06)------PipelineBreakerBuffer 07)--------AggregateExec: mode=FinalPartitioned, gby=[group_key@0 as group_key], aggr=[sum(big.payload)] 08)----------RepartitionExec: partitioning=Hash([group_key@0], 4), input_partitions=4 09)------------PipelineBreakerBuffer 10)--------------AggregateExec: mode=Partial, gby=[group_key@0 as group_key], aggr=[sum(big.payload)] 11)----------------DataSourceExec: partitions=4, partition_sizes=[4, 3, 3, 3] ``` They just execute in an existing `Arc<dyn PhysicalPlanNode>`, being re-written at boundaries (obviously not the in-progress ones). > some coordinator that knows about all hash sizes, allocations, stats See `RuntimeOptimizerExec` > For distributed the stats synchronization For Ballista, I assumed this would just be 1. recognize the stage boundary, 2. send the info to the scheduler on the stage completion gRPC call (admittedly this breaks down with streaming or large responses). I was hoping we could start with a dedicated stats collector node outside any operator (`PipelineBreakerBuffer` does this with row_count in the PR, `SortExtrema` in another, no reason not to add more - KLL, etc), but it's an interesting question if we can crack open existing operators and extract their state, or if these concerns are intractable. > Is the scope to come up with some universal trait/approach that would shape the AQE for single machine and distributed? That is what I interpreted I was being asked to do by Andrew, yes. This adds true AQE to DF without much disruption (comparatively speaking :), and defers the distribution concerns (and distribution only) to downstream implementations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
