avantgardnerio commented on PR #23167:
URL: https://github.com/apache/datafusion/pull/23167#issuecomment-4803250969
Thanks again for the feedback! I think it has helped me shape this into a
much more AQE-looking PR:
0. The PR is now functional. It actually does flip the join sides. The SLT
passes just like it did without AQE. Based on `runtime_row_count()`
1. `PipelineBreakerBuffer` is now `StageBoundryBuffer`, and it actually
buffers. This allows us to insert it even on the left (streaming side) of the
example SLT HashJoin.
2. Each StageBoundry gets assigned a number, within a stage all leaves
execute in parallel. Stages execute sequentially.
3. RuntimeOptimizer calls `prime()` on each `StageBoundryBuffer` to kick it
off in it's own tokio task, much like how `RepartitionExec` polls it's children
today - this skips the normal DataFusion poll-based flow to execute subtrees
incrementally and bottom up
4. `RuntimeRule` now has the exact same signature as
`PhysicalOptimizerRule`, hopefully showing how they can be the same (just
requires refactoring into a common module). Rules would have to be made
"runtime aware" one at a time, but this gives us a migration path to AQE
5. `StageBoundryBuffer` is it's own pipeline breaker - it stores everything
in an unbounded channel in this PR. I think in the future we could `prime()` it
until it hits a memory threshold, then give up and release it to start
streaming - if this happens we'd no longer be able to get run time stats and
modify the plan, but at least it would be "no worse" than today.
```
RTO: stage 0 ready (2 boundaries); firing 1 rule(s) before release
SwapBuildSideIfInverted: flipping HashJoinExec — current build (left) =
100 rows, probe (right) = 5 rows.
RTO: stage 0 released; downstream consumers can now drain ...
```
```
01)RuntimeOptimizerExec
02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0,
group_key@0)], projection=[group_key@2, sum_payload@3, payload@1]
03)----StageBoundaryBuffer: stage=0
04)------CoalescePartitionsExec
05)--------DataSourceExec: partitions=4, partition_sizes=[1, 0, 0, 0]
06)----StageBoundaryBuffer: stage=0
07)------ProjectionExec: expr=[group_key@0 as group_key, sum(big.payload)@1
as sum_payload]
08)--------AggregateExec: mode=FinalPartitioned, gby=[group_key@0 as
group_key], aggr=[sum(big.payload)]
09)----------RepartitionExec: partitioning=Hash([group_key@0], 4),
input_partitions=4
10)------------AggregateExec: mode=Partial, gby=[group_key@0 as group_key],
aggr=[sum(big.payload)]
11)--------------DataSourceExec: partitions=4, partition_sizes=[4, 3, 3, 3]
```
Hopefully this makes clear that the PR is a migration path, a generalizable
solution. So what's missing? If we keep going this route would we be able to 1.
land it today, 2. eventually get to "full" AQE?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]