milenkovicm opened a new issue, #1359:
URL: https://github.com/apache/datafusion-ballista/issues/1359

   ## Executive Summary
   
   - Create a distributed query planner which can change stages, add/or remove 
stages, or cancel stage (partitioning) to  reflect collected runtime statistics.
   - An extendable distributed query planner, simple to extend, just like 
adding a new physical rule.
   
   ## Overview
   
   Currently, jobs in Ballista are divided into stages at job submission time 
according to predefined rules.
   Ballista can optimize stages based on runtime information using set of 
physical optimizers, changing the join order for example. This has been 
[implemented 
here](https://github.com/milenkovicm/datafusion-ballista/blob/26fcbf66233de2c6d12545277242e16b4ede386a/ballista/scheduler/src/state/execution_stage.rs#L372-L378)
 with only few physical optimizer rules used.
   
   Also, few more rules are scattered around  
[here](https://github.com/milenkovicm/datafusion-ballista/blob/26fcbf66233de2c6d12545277242e16b4ede386a/ballista/scheduler/src/planner.rs#L130)
 and 
[here](https://github.com/milenkovicm/datafusion-ballista/blob/26fcbf66233de2c6d12545277242e16b4ede386a/ballista/scheduler/src/planner.rs#L162)
   
   Current approach may fail to generate valid plan in case there is 
[dependency between physical
   optimizers](https://github.com/apache/datafusion-ballista/issues/1296). 
Furthermore, some optimizations can’t be propagated across stage boundaries 
potentially impacting overall performance.
   
   Physical optimizers used work well with a major downside, rules being 
scattered across code,
   making adding or removing rules non-trivial task. Scattered rules are hard 
to control, thus centralized rule "facility" is need. It would make it easier 
to change or alter behavior of physical planners or to experiment with new 
solutions.
   
   Another major problem arises when stages are split; it becomes impossible to 
change their layout, which can be necessary in certain situations. For 
instance, if an exchange produces an empty result, we’d need to run a 
subsequent stage even though the system knows it will also produce an empty 
result. This can lead to significant overhead on the scheduler, especially with 
many tasks. Having the ability to remove a stage would be helpful in this 
scenario.
   
   ### Proposal
   
   Implement adaptive query planner which would address some of the issues with 
the current approach. In the core of the design are pluggable physical 
optimizer rules which would re-optimize plan after each stage.
   
   This proposal is strongly influenced by [Adaptive and Robust Query Execution 
for Lakehouses at 
Scale](https://www.cs.cmu.edu/~15721-f24/papers/AQP_in_Lakehouse.pdf) paper. 
Major difference from it is that ballista works on physical plan rather than 
logical plan.
   
   ### Impact on DataFusion
   
   Currently, in DataFusion, physical optimizer rules are run once per plan 
execution, with current proposal, whole set of rules will be run after each 
stage, thus if physical optimizer rules are not idempotent, thus they may keep 
adding physical plane exec nodes even they have been already added.
   
   Physical optimizer rules have single pass, which may not work well with 
current approach (see "adding new stage" after join reordering further below).
   
   ### Impact on Ballista
   
   Stage planning in Ballista revolves around `ExecutionGraph`, which 
prescribes a translation from a physical plan to a set of stages executing that 
plan. `ExecutionGraph` was implemented as a concrete trait, hence it’s not a 
trivial task to replace it. `ExecutionGraph` does accept a `dyn 
DistributedPlanner`, but unfortunately `DistributedPlanner` returns a static 
list of stages, which would not work with adaptive query planning.
   
   `ExecutionGraph` is initialized in `TaskManager` when new job is submitted.
   
   There are two approaches which we can take with `ExecutionGraph`:
   
   - Keep `ExecutionGraph` as it is, just add code required for `AQE`. This 
approach might be possible, but it would be  hard to maintain, and probably 
messy to implement.
   - Rename `ExecutionGraph` to `StaticExecutionGraph` introduce `dyn 
ExecutionGraph` would be implemented by `StaticExecutionGraph` and new 
`AdaptiveExecutionGraph`. Change implementation in `TaskManger` based on 
predefined configuration parameter. This approach looks like easier route to 
take. It would enable us to have two implementation running in parallel until 
AQE implementation matures.
   
   Adaptive planner will need some time to get it right. It will work with 
current distributed planner for foreseeable
   future, will be disabled by default with configuration option to turn it on.
   
   ## Plan Arithmetics
   
   ### Adding New Stage to Running Plan
   
   Some optimization may produce a need for an exchange to be inserted into a 
running plan [bug 1](https://github.com/apache/datafusion-ballista/pull/1302) 
and [bug 2](https://github.com/apache/datafusion-ballista/issues/1321)
   
   ```text
   ┌───────────────────────────┐
   │   AdaptiveDatafusionExec  │
   │    --------------------   │
   │      is_final: false      │
   │         plan_id: 1        │
   │                           │
   │          stage_id         │
   └─────────────┬─────────────┘
   ┌─────────────┴─────────────┐
   │       CrossJoinExec       ├──────────────┐
   └─────────────┬─────────────┘              │
   ┌─────────────┴─────────────┐┌─────────────┴─────────────┐
   │   CoalescePartitionsExec  ││      CooperativeExec      │
   │                           ││    --------------------   │
   │                           ││      CooperativeExec      │
   └─────────────┬─────────────┘└─────────────┬─────────────┘
   ┌─────────────┴─────────────┐┌─────────────┴─────────────┐
   │        ExchangeExec       ││         File Scan         │
   │    --------------------   ││    --------------------   │
   │       partitioning:       ││     num_partitions: 2     │
   │    Hash([big_col@0], 2)   ││                           │
   │                           ││        statistics:        │
   │         plan_id: 0        ││ [Rows=Inexact(1024), Bytes│
   │       stage_id: None      ││ =Inexact(8192), [(Col[0]: │
   │   stage_resolved: false   ││            )]]            │
   └─────────────┬─────────────┘└───────────────────────────┘
   ┌─────────────┴─────────────┐
   │      CooperativeExec      │
   │    --------------------   │
   │      CooperativeExec      │
   └─────────────┬─────────────┘
   ┌─────────────┴─────────────┐
   │         File Scan         │
   └───────────────────────────┘
   ```
   
   plan after join reordering and new stage addition (`plan_id: 3`), looks 
similar to:
   
   ```text
   ┌───────────────────────────┐
   │   AdaptiveDatafusionExec  │
   │    --------------------   │
   │       is_final: true      │
   │         plan_id: 1        │
   │        stage_id: 2        │
   └─────────────┬─────────────┘
   ┌─────────────┴─────────────┐
   │       ProjectionExec      │
   │    --------------------   │
   │      big_col: big_col     │
   └─────────────┬─────────────┘
   ┌─────────────┴─────────────┐
   │       CrossJoinExec       ├──────────────┐
   └─────────────┬─────────────┘              │
   ┌─────────────┴─────────────┐┌─────────────┴─────────────┐
   │   CoalescePartitionsExec  ││        ExchangeExec       │
   │                           ││    --------------------   │
   │                           ││       partitioning:       │
   │                           ││    Hash([big_col@0], 2)   │
   │                           ││                           │
   │                           ││         plan_id: 0        │
   │                           ││     stage_id: 0           │
   │                           ││    stage_resolved: true   │
   └─────────────┬─────────────┘└─────────────┬─────────────┘
   ┌─────────────┴─────────────┐┌─────────────┴─────────────┐
   │        ExchangeExec       ││      CooperativeExec      │
   │    --------------------   ││    --------------------   │
   │       partitioning:       ││      CooperativeExec      │
   │           None            ││                           │
   │                           ││                           │
   │         plan_id: 3        ││                           │
   │     stage_id: 1           ││                           │
   │    stage_resolved: true   ││                           │
   └─────────────┬─────────────┘└─────────────┬─────────────┘
   ┌─────────────┴─────────────┐┌─────────────┴─────────────┐
   │      CooperativeExec      ││         File Scan         │
   │    --------------------   ││                           │
   │      CooperativeExec      ││                           │
   └─────────────┬─────────────┘└───────────────────────────┘
   ┌─────────────┴─────────────┐
   │         File Scan         │
   │    --------------------   │
   │     num_partitions: 2     │
   │                           │
   │        statistics:        │
   │ [Rows=Inexact(1024), Bytes│
   │ =Inexact(8192), [(Col[0]: │
   │            )]]            │
   └───────────────────────────┘
   ```
   
   ### Removing Stages from a Running Plan
   
   Apart from adding stages, stages should be removed if they will produce 
empty result.
   This will reduce pressure on scheduler as it will skip task scheduling.
   
   Take for example plan
   
   ```text
   [stage 2] 
--------------------------------------------------------------------------------------------------
   AdaptiveDatafusionExec: is_final=false, plan_id=2, stage_id=pending
     ProjectionExec: expr=[c0@0 as c0, count(Int64(1))@1 as count(*)]
       AggregateExec: mode=FinalPartitioned, gby=[c0@0 as c0], 
aggr=[count(Int64(1))]
         CoalesceBatchesExec: target_batch_size=8192
          [stage 1] 
-------------------------------------------------------------------------------------------
      
           ExchangeExec: partitioning=Hash([c0@0], 2), plan_id=1, 
stage_id=pending, stage_resolved=false
             AggregateExec: mode=Partial, gby=[c0@0 as c0], 
aggr=[count(Int64(1))]
               ProjectionExec: expr=[min(t.a)@1 as c0]
                 AggregateExec: mode=FinalPartitioned, gby=[c@0 as c], 
aggr=[min(t.a)]
                   CoalesceBatchesExec: target_batch_size=8192
                    [stage 0] 
---------------------------------------------------------------------------------
                
                     ExchangeExec: partitioning=Hash([c@0], 2), plan_id=0, 
stage_id=pending, stage_resolved=false
                       AggregateExec: mode=Partial, gby=[c@1 as c], 
aggr=[min(t.a)]
                         CoalesceBatchesExec: target_batch_size=8192
                           FilterExec: a@0 = 42
                             DataSourceExec: partitions=1, partition_sizes=[1]
   ```
   
   Running first stage, producing no results
   
   ```text
   ShuffleWriterExec: partitioning:Hash([c@0], 2)
     AggregateExec: mode=Partial, gby=[c@1 as c], aggr=[min(t.a)]
       CoalesceBatchesExec: target_batch_size=8192
         FilterExec: a@0 = 42
           DataSourceExec: partitions=1, partition_sizes=[1]
   ```
   
   statistic information will be used, by adaptive planner to skip all other 
stages and just return empty result.
   
   ```text
   ┌───────────────────────────┐
   │     ShuffleWriterExec     │
   │    --------------------   │
   │     partitioning: None    │
   └─────────────┬─────────────┘
   ┌─────────────┴─────────────┐
   │         EmptyExec         │
   └───────────────────────────┘
   ```
   
   ### Coalesce Shuffle Partitions (Elastic Shuffle Parallelism)
   
   Should shuffle partitions produce very little data, planner should coalesce 
partitions, reducing number of task
   needed to produce results. The rule modifies the partitioning specification 
in the ShuffleRead operator
   
   ### Canceling Redundant (Running) Stages
   
   Adding or removing stages may arise need to cancel already running stages. 
This can happen, for example, if some stage produces empty result making other 
stages unnecessary, or if there is better, semantic equal but considered 
superior plan (this case needs further investigation).
   
   Take for example plan
   
   ```text
   AdaptiveDatafusionExec: is_final=false, plan_id=2, stage_id=pending
     CoalesceBatchesExec: target_batch_size=8192
       HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0)], 
projection=[a@1, b@2, c@3]
         CoalesceBatchesExec: target_batch_size=8192
           ExchangeExec: partitioning=Hash([a@0], 2), plan_id=0, stage_id=0, 
stage_resolved=false
             CoalesceBatchesExec: target_batch_size=8192
               FilterExec: b@1 = 42, projection=[a@0]
                 DataSourceExec: partitions=1, partition_sizes=[1]
         CoalesceBatchesExec: target_batch_size=8192
           ExchangeExec: partitioning=Hash([a@0], 2), plan_id=1, stage_id=1, 
stage_resolved=false
             DataSourceExec: partitions=1, partition_sizes=[1]
   ```
   
   we have two runnable stages, a build side of join `ExchangeExec: 
partitioning=Hash([a@0], 2), plan_id=0, stage_id=0, stage_resolved=false` and a 
probe side `ExchangeExec: partitioning=Hash([a@0], 2), plan_id=1, stage_id=1, 
stage_resolved=false`. 
   
   Let's say probe side needs some time to finish.
   
   If the build side, stage (0), finishes first with shuffle producing no data 
hash join will produce no data as well, thus
   whole hash join could be replaced with `EmptyExec`. Something like:
   
   ```text
   AdaptiveDatafusionExec: is_final=false, plan_id=2, stage_id=pending
     EmptyExec
   ```
   
   Plan could not finish until both stages finish, increasing overall result 
latency. Cancelling redundant stages would
   make sense, as it will reduce cluster utilization (and result latency). 
*(Actually, current implementation will, produce final result and keep stage 
(1) running.)*
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to