avantgardnerio commented on issue #23194:
URL: https://github.com/apache/datafusion/issues/23194#issuecomment-4858814434

   @comphead if you click through into the (working) [reference 
PoC](https://github.com/apache/datafusion/pull/23167), you'll find numbered 
stages:
   
   ```
   01)RuntimeOptimizerExec
   02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, 
group_key@0)], projection=[group_key@2, sum_payload@3, payload@1]
   03)----CoalescePartitionsExec
   04)------DataSourceExec: partitions=4, partition_sizes=[1, 0, 0, 0]
   05)----ProjectionExec: expr=[group_key@0 as group_key, sum(big.payload)@1 as 
sum_payload]
   06)------PipelineBreakerBuffer
   07)--------AggregateExec: mode=FinalPartitioned, gby=[group_key@0 as 
group_key], aggr=[sum(big.payload)]
   08)----------RepartitionExec: partitioning=Hash([group_key@0], 4), 
input_partitions=4
   09)------------PipelineBreakerBuffer
   10)--------------AggregateExec: mode=Partial, gby=[group_key@0 as 
group_key], aggr=[sum(big.payload)]
   11)----------------DataSourceExec: partitions=4, partition_sizes=[4, 3, 3, 3]
   ```
   
   They just execute in an existing `Arc<dyn PhysicalPlanNode>`, being 
re-written at boundaries (obviously not the in-progress ones).
   
   > some coordinator that knows about all hash sizes, allocations, stats
   
   See `RuntimeOptimizerExec`
   
   > For distributed the stats synchronization
   
   For Ballista, I assumed this would just be 1. recognize the stage boundary, 
2. send the info to the scheduler on the stage completion gRPC call (admittedly 
this breaks down with streaming or large responses). I was hoping we could 
start with a dedicated stats collector node outside any operator 
(`PipelineBreakerBuffer` does this with row_count in the PR, `SortExtrema` in 
another, no reason not to add more - KLL, etc), but it's an interesting 
question if we can crack open existing operators and extract their state, or if 
these concerns are intractable.
   
   > Is the scope to come up with some universal trait/approach that would 
shape the AQE for single machine and distributed?
   
   That is what I interpreted I was being asked to do by Andrew, yes. This adds 
true AQE to DF without much disruption (comparatively speaking :), and defers 
the distribution concerns (and distribution only) to downstream implementations.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to