milenkovicm opened a new issue, #1359: URL: https://github.com/apache/datafusion-ballista/issues/1359
## Executive Summary - Create a distributed query planner which can change stages, add/or remove stages, or cancel stage (partitioning) to reflect collected runtime statistics. - An extendable distributed query planner, simple to extend, just like adding a new physical rule. ## Overview Currently, jobs in Ballista are divided into stages at job submission time according to predefined rules. Ballista can optimize stages based on runtime information using set of physical optimizers, changing the join order for example. This has been [implemented here](https://github.com/milenkovicm/datafusion-ballista/blob/26fcbf66233de2c6d12545277242e16b4ede386a/ballista/scheduler/src/state/execution_stage.rs#L372-L378) with only few physical optimizer rules used. Also, few more rules are scattered around [here](https://github.com/milenkovicm/datafusion-ballista/blob/26fcbf66233de2c6d12545277242e16b4ede386a/ballista/scheduler/src/planner.rs#L130) and [here](https://github.com/milenkovicm/datafusion-ballista/blob/26fcbf66233de2c6d12545277242e16b4ede386a/ballista/scheduler/src/planner.rs#L162) Current approach may fail to generate valid plan in case there is [dependency between physical optimizers](https://github.com/apache/datafusion-ballista/issues/1296). Furthermore, some optimizations can’t be propagated across stage boundaries potentially impacting overall performance. Physical optimizers used work well with a major downside, rules being scattered across code, making adding or removing rules non-trivial task. Scattered rules are hard to control, thus centralized rule "facility" is need. It would make it easier to change or alter behavior of physical planners or to experiment with new solutions. Another major problem arises when stages are split; it becomes impossible to change their layout, which can be necessary in certain situations. For instance, if an exchange produces an empty result, we’d need to run a subsequent stage even though the system knows it will also produce an empty result. This can lead to significant overhead on the scheduler, especially with many tasks. Having the ability to remove a stage would be helpful in this scenario. ### Proposal Implement adaptive query planner which would address some of the issues with the current approach. In the core of the design are pluggable physical optimizer rules which would re-optimize plan after each stage. This proposal is strongly influenced by [Adaptive and Robust Query Execution for Lakehouses at Scale](https://www.cs.cmu.edu/~15721-f24/papers/AQP_in_Lakehouse.pdf) paper. Major difference from it is that ballista works on physical plan rather than logical plan. ### Impact on DataFusion Currently, in DataFusion, physical optimizer rules are run once per plan execution, with current proposal, whole set of rules will be run after each stage, thus if physical optimizer rules are not idempotent, thus they may keep adding physical plane exec nodes even they have been already added. Physical optimizer rules have single pass, which may not work well with current approach (see "adding new stage" after join reordering further below). ### Impact on Ballista Stage planning in Ballista revolves around `ExecutionGraph`, which prescribes a translation from a physical plan to a set of stages executing that plan. `ExecutionGraph` was implemented as a concrete trait, hence it’s not a trivial task to replace it. `ExecutionGraph` does accept a `dyn DistributedPlanner`, but unfortunately `DistributedPlanner` returns a static list of stages, which would not work with adaptive query planning. `ExecutionGraph` is initialized in `TaskManager` when new job is submitted. There are two approaches which we can take with `ExecutionGraph`: - Keep `ExecutionGraph` as it is, just add code required for `AQE`. This approach might be possible, but it would be hard to maintain, and probably messy to implement. - Rename `ExecutionGraph` to `StaticExecutionGraph` introduce `dyn ExecutionGraph` would be implemented by `StaticExecutionGraph` and new `AdaptiveExecutionGraph`. Change implementation in `TaskManger` based on predefined configuration parameter. This approach looks like easier route to take. It would enable us to have two implementation running in parallel until AQE implementation matures. Adaptive planner will need some time to get it right. It will work with current distributed planner for foreseeable future, will be disabled by default with configuration option to turn it on. ## Plan Arithmetics ### Adding New Stage to Running Plan Some optimization may produce a need for an exchange to be inserted into a running plan [bug 1](https://github.com/apache/datafusion-ballista/pull/1302) and [bug 2](https://github.com/apache/datafusion-ballista/issues/1321) ```text ┌───────────────────────────┐ │ AdaptiveDatafusionExec │ │ -------------------- │ │ is_final: false │ │ plan_id: 1 │ │ │ │ stage_id │ └─────────────┬─────────────┘ ┌─────────────┴─────────────┐ │ CrossJoinExec ├──────────────┐ └─────────────┬─────────────┘ │ ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ │ CoalescePartitionsExec ││ CooperativeExec │ │ ││ -------------------- │ │ ││ CooperativeExec │ └─────────────┬─────────────┘└─────────────┬─────────────┘ ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ │ ExchangeExec ││ File Scan │ │ -------------------- ││ -------------------- │ │ partitioning: ││ num_partitions: 2 │ │ Hash([big_col@0], 2) ││ │ │ ││ statistics: │ │ plan_id: 0 ││ [Rows=Inexact(1024), Bytes│ │ stage_id: None ││ =Inexact(8192), [(Col[0]: │ │ stage_resolved: false ││ )]] │ └─────────────┬─────────────┘└───────────────────────────┘ ┌─────────────┴─────────────┐ │ CooperativeExec │ │ -------------------- │ │ CooperativeExec │ └─────────────┬─────────────┘ ┌─────────────┴─────────────┐ │ File Scan │ └───────────────────────────┘ ``` plan after join reordering and new stage addition (`plan_id: 3`), looks similar to: ```text ┌───────────────────────────┐ │ AdaptiveDatafusionExec │ │ -------------------- │ │ is_final: true │ │ plan_id: 1 │ │ stage_id: 2 │ └─────────────┬─────────────┘ ┌─────────────┴─────────────┐ │ ProjectionExec │ │ -------------------- │ │ big_col: big_col │ └─────────────┬─────────────┘ ┌─────────────┴─────────────┐ │ CrossJoinExec ├──────────────┐ └─────────────┬─────────────┘ │ ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ │ CoalescePartitionsExec ││ ExchangeExec │ │ ││ -------------------- │ │ ││ partitioning: │ │ ││ Hash([big_col@0], 2) │ │ ││ │ │ ││ plan_id: 0 │ │ ││ stage_id: 0 │ │ ││ stage_resolved: true │ └─────────────┬─────────────┘└─────────────┬─────────────┘ ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ │ ExchangeExec ││ CooperativeExec │ │ -------------------- ││ -------------------- │ │ partitioning: ││ CooperativeExec │ │ None ││ │ │ ││ │ │ plan_id: 3 ││ │ │ stage_id: 1 ││ │ │ stage_resolved: true ││ │ └─────────────┬─────────────┘└─────────────┬─────────────┘ ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ │ CooperativeExec ││ File Scan │ │ -------------------- ││ │ │ CooperativeExec ││ │ └─────────────┬─────────────┘└───────────────────────────┘ ┌─────────────┴─────────────┐ │ File Scan │ │ -------------------- │ │ num_partitions: 2 │ │ │ │ statistics: │ │ [Rows=Inexact(1024), Bytes│ │ =Inexact(8192), [(Col[0]: │ │ )]] │ └───────────────────────────┘ ``` ### Removing Stages from a Running Plan Apart from adding stages, stages should be removed if they will produce empty result. This will reduce pressure on scheduler as it will skip task scheduling. Take for example plan ```text [stage 2] -------------------------------------------------------------------------------------------------- AdaptiveDatafusionExec: is_final=false, plan_id=2, stage_id=pending ProjectionExec: expr=[c0@0 as c0, count(Int64(1))@1 as count(*)] AggregateExec: mode=FinalPartitioned, gby=[c0@0 as c0], aggr=[count(Int64(1))] CoalesceBatchesExec: target_batch_size=8192 [stage 1] ------------------------------------------------------------------------------------------- ExchangeExec: partitioning=Hash([c0@0], 2), plan_id=1, stage_id=pending, stage_resolved=false AggregateExec: mode=Partial, gby=[c0@0 as c0], aggr=[count(Int64(1))] ProjectionExec: expr=[min(t.a)@1 as c0] AggregateExec: mode=FinalPartitioned, gby=[c@0 as c], aggr=[min(t.a)] CoalesceBatchesExec: target_batch_size=8192 [stage 0] --------------------------------------------------------------------------------- ExchangeExec: partitioning=Hash([c@0], 2), plan_id=0, stage_id=pending, stage_resolved=false AggregateExec: mode=Partial, gby=[c@1 as c], aggr=[min(t.a)] CoalesceBatchesExec: target_batch_size=8192 FilterExec: a@0 = 42 DataSourceExec: partitions=1, partition_sizes=[1] ``` Running first stage, producing no results ```text ShuffleWriterExec: partitioning:Hash([c@0], 2) AggregateExec: mode=Partial, gby=[c@1 as c], aggr=[min(t.a)] CoalesceBatchesExec: target_batch_size=8192 FilterExec: a@0 = 42 DataSourceExec: partitions=1, partition_sizes=[1] ``` statistic information will be used, by adaptive planner to skip all other stages and just return empty result. ```text ┌───────────────────────────┐ │ ShuffleWriterExec │ │ -------------------- │ │ partitioning: None │ └─────────────┬─────────────┘ ┌─────────────┴─────────────┐ │ EmptyExec │ └───────────────────────────┘ ``` ### Coalesce Shuffle Partitions (Elastic Shuffle Parallelism) Should shuffle partitions produce very little data, planner should coalesce partitions, reducing number of task needed to produce results. The rule modifies the partitioning specification in the ShuffleRead operator ### Canceling Redundant (Running) Stages Adding or removing stages may arise need to cancel already running stages. This can happen, for example, if some stage produces empty result making other stages unnecessary, or if there is better, semantic equal but considered superior plan (this case needs further investigation). Take for example plan ```text AdaptiveDatafusionExec: is_final=false, plan_id=2, stage_id=pending CoalesceBatchesExec: target_batch_size=8192 HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0)], projection=[a@1, b@2, c@3] CoalesceBatchesExec: target_batch_size=8192 ExchangeExec: partitioning=Hash([a@0], 2), plan_id=0, stage_id=0, stage_resolved=false CoalesceBatchesExec: target_batch_size=8192 FilterExec: b@1 = 42, projection=[a@0] DataSourceExec: partitions=1, partition_sizes=[1] CoalesceBatchesExec: target_batch_size=8192 ExchangeExec: partitioning=Hash([a@0], 2), plan_id=1, stage_id=1, stage_resolved=false DataSourceExec: partitions=1, partition_sizes=[1] ``` we have two runnable stages, a build side of join `ExchangeExec: partitioning=Hash([a@0], 2), plan_id=0, stage_id=0, stage_resolved=false` and a probe side `ExchangeExec: partitioning=Hash([a@0], 2), plan_id=1, stage_id=1, stage_resolved=false`. Let's say probe side needs some time to finish. If the build side, stage (0), finishes first with shuffle producing no data hash join will produce no data as well, thus whole hash join could be replaced with `EmptyExec`. Something like: ```text AdaptiveDatafusionExec: is_final=false, plan_id=2, stage_id=pending EmptyExec ``` Plan could not finish until both stages finish, increasing overall result latency. Cancelling redundant stages would make sense, as it will reduce cluster utilization (and result latency). *(Actually, current implementation will, produce final result and keep stage (1) running.)* -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
