gruuya opened a new issue, #9084: URL: https://github.com/apache/arrow-datafusion/issues/9084
### Describe the bug I have been troubleshooting the TPCH-DS query 64, and have found some performance issues, seemingly stemming from `ExecutionPlan::output_partitioning`, `ExecutionPlan::equivalence_properties` and their inherent branching nature throughout various `ExecutionPlan` implementations. In particular the problem manifests by the process pinning the CPU at 100%, and getting stuck in the `EnforceDistribution` physical optimizer, which aggravates the underlying problem by inter-leaving `RepartitionExec` into the existing plan tree. ### To Reproduce #### Setup You can use the existing `tpcds_physical_q64` test, just un-ignore it and set `env_logger::init()` somewhere to capture the logs. You'll also need to set `RUST_MIN_STACK=3000000` to alleviate an unrelated problem described in https://github.com/apache/arrow-datafusion/issues/4786. #### Flamegraph Next you can optionally capture the flamegraph to get an initial sense of what's happening ```bash sudo cargo flamegraph --dev -v --test tpcds_planning -- tpcds_physical_q64 ``` I see something like [flamegraph.svg.zip](https://github.com/apache/arrow-datafusion/files/14112252/flamegraph.svg.zip). Notably, there is a large build-up of (likely under-sampled) `output_partitioning` and `equivalence_properties` calls. #### Investigation I used some counters ```rust pub struct Counter { count: std::sync::Mutex<u32>, } impl Counter { pub fn new() -> Counter { Counter { count: std::sync::Mutex::new(0), } } pub fn reset(&self) { let mut counter = self.count.lock().unwrap(); *counter = 0; } pub fn increment(&self) { let mut counter = self.count.lock().unwrap(); *counter += 1; } pub fn get(&self) -> u32 { let counter = self.count.lock().unwrap(); *counter } } use once_cell::sync::Lazy; pub static OUT_PART_COUNTER: Lazy<Arc<Counter>> = Lazy::new(|| Arc::new(Counter::new())); pub static EQ_PROP_COUNTER: Lazy<Arc<Counter>> = Lazy::new(|| Arc::new(Counter::new())); ``` and then sprinkled `crate::OUT_PART_COUNTER.increment()` and `crate::EQ_PROP_COUNTER.increment()` at the begining of `fn output_partitioning(&self)` and `fn equivalence_properties(&self)` respectively, for `MemoryExec`, `RepartitionExec`, `HashJoinExec`, `ProjectionExec` and `AggregateExec`. I also added the following ```diff diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 0c5c2d78b..d4c0ddc22 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -208,9 +208,16 @@ impl PhysicalOptimizerRule for EnforceDistribution { }; let distribution_context = DistributionContext::new_default(adjusted); + use datafusion_physical_plan::{OUT_PART_COUNTER, EQ_PROP_COUNTER}; + OUT_PART_COUNTER.reset(); + EQ_PROP_COUNTER.reset(); // Distribution enforcement needs to be applied bottom-up. let distribution_context = distribution_context.transform_up(&|distribution_context| { + log::warn!("output_partitioning {}, equivalence_properties {}", OUT_PART_COUNTER.get(), EQ_PROP_COUNTER.get()); + OUT_PART_COUNTER.reset(); + EQ_PROP_COUNTER.reset(); + ensure_distribution(distribution_context, config) })?; Ok(distribution_context.plan) ``` #### Results Soon enough after starting the test you'll notice these numbers reach values on the order of 100K and then millions and more as the slowdown in the iterations becomes appreciable ```bash [2024-01-31T13:58:01Z WARN datafusion::physical_optimizer::enforce_distribution] output_partitioning 40955, equivalence_properties 58363 [2024-01-31T13:58:01Z WARN datafusion::physical_optimizer::enforce_distribution] output_partitioning 0, equivalence_properties 0 [2024-01-31T13:58:03Z WARN datafusion::physical_optimizer::enforce_distribution] output_partitioning 245749, equivalence_properties 350184 [2024-01-31T13:58:03Z WARN datafusion::physical_optimizer::enforce_distribution] output_partitioning 81915, equivalence_properties 116731 [2024-01-31T13:58:03Z WARN datafusion::physical_optimizer::enforce_distribution] output_partitioning 0, equivalence_properties 0 [2024-01-31T13:58:06Z WARN datafusion::physical_optimizer::enforce_distribution] output_partitioning 491509, equivalence_properties 700392 [2024-01-31T13:58:07Z WARN datafusion::physical_optimizer::enforce_distribution] output_partitioning 163835, equivalence_properties 233467 [2024-01-31T13:58:07Z WARN datafusion::physical_optimizer::enforce_distribution] output_partitioning 0, equivalence_properties 0 [2024-01-31T13:58:12Z WARN datafusion::physical_optimizer::enforce_distribution] output_partitioning 983029, equivalence_properties 1400808 [2024-01-31T13:58:14Z WARN datafusion::physical_optimizer::enforce_distribution] output_partitioning 327675, equivalence_properties 466939 [2024-01-31T13:58:14Z WARN datafusion::physical_optimizer::enforce_distribution] output_partitioning 0, equivalence_properties 0 [2024-01-31T13:58:25Z WARN datafusion::physical_optimizer::enforce_distribution] output_partitioning 1966069, equivalence_properties 2801640 [2024-01-31T13:58:29Z WARN datafusion::physical_optimizer::enforce_distribution] output_partitioning 655355, equivalence_properties 933883 [2024-01-31T13:58:29Z WARN datafusion::physical_optimizer::enforce_distribution] output_partitioning 0, equivalence_properties 0 [2024-01-31T13:58:29Z WARN datafusion::physical_optimizer::enforce_distribution] output_partitioning 0, equivalence_properties 1 [2024-01-31T13:58:29Z WARN datafusion::physical_optimizer::enforce_distribution] output_partitioning 0, equivalence_properties 1 [2024-01-31T13:58:51Z WARN datafusion::physical_optimizer::enforce_distribution] output_partitioning 3932157, equivalence_properties 5603310 [2024-01-31T13:59:13Z WARN datafusion::physical_optimizer::enforce_distribution] output_partitioning 3932151, equivalence_properties 5603324 [2024-01-31T13:59:50Z WARN datafusion::physical_optimizer::enforce_distribution] output_partitioning 6553589, equivalence_properties 9338876 [2024-01-31T14:02:12Z WARN datafusion::physical_optimizer::enforce_distribution] output_partitioning 24903668, equivalence_properties 35487723 ``` #### Explanation WARNING: A lot of gritty details below Taking a look at one of the plans that appears early in the optimization process: ```bash [ "AggregateExec: mode=Partial, gby=[cs_item_sk@0 as cs_item_sk], aggr=[SUM(catalog_sales.cs_ext_list_price), SUM(catalog_returns.cr_refunded_cash + catalog_returns.cr_reversed_charge + catalog_returns.cr_store_credit)]", " ProjectionExec: expr=[cs_item_sk@0 as cs_item_sk, cs_ext_list_price@2 as cs_ext_list_price, cr_refunded_cash@5 as cr_refunded_cash, cr_reversed_charge@6 as cr_reversed_charge, cr_store_credit@7 as cr_store_credit]", " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(cs_item_sk@0, cr_item_sk@0), (cs_order_number@1, cr_order_number@1)]", " RepartitionExec: partitioning=Hash([cs_item_sk@0, cs_order_number@1], 12), input_partitions=0", " MemoryExec: partitions=0, partition_sizes=[]", " RepartitionExec: partitioning=Hash([cr_item_sk@0, cr_order_number@1], 12), input_partitions=0", " MemoryExec: partitions=0, partition_sizes=[]", ] ``` and exploring the relevant implementations of `output_partitioning` and `equivalence_properties`, you can see that they have the potential to branch off into calling two methods on the input, thus leading to a exponential call tree. In particular calling just `output_partitioning()` on the top-most plan leads to: - in [`AggregateExec::output_partitioning`](https://github.com/apache/arrow-datafusion/blob/15f59d9861082a4d5d39bddce63d81cc7b9fb299/datafusion/physical-plan/src/aggregates/mod.rs#L601-L608) ```rust 1 x out_part() + 2 x input.out_part() + input.eq_prop() ``` - for the `ProjectionExec` node below [this gets expanded](https://github.com/apache/arrow-datafusion/blob/15f59d9861082a4d5d39bddce63d81cc7b9fb299/datafusion/physical-plan/src/projection.rs#L175-L176) to ```rust 1 x out_part() + 2 x (out_part() + input.out_part() + input.eq_prop()) + (eq_prop() + input.eq_prop()) = 3 x out_part() + eq_prop() + 2 x input.out_part() + 3 x input.eq_prop() ``` - Subsequently, `HashJoinExec` will [expand](https://github.com/apache/arrow-datafusion/blob/15f59d9861082a4d5d39bddce63d81cc7b9fb299/datafusion/physical-plan/src/joins/symmetric_hash_join.rs#L398-L412) these calls to left and right inputs ```rust 3 x out_part() + eq_prop() + 2 x (out_part() + left.out_part() + right.out_part()) + 3 x (eq_prop() + left.eq_prop() + right.eq_prop()) = 5 x out_part() + 4 x eq_prop() + 2 x (left.out_part() + right.out_part()) + 3 x (left.eq_prop() + right.eq_prop()) ``` - Next, given that both left and right inputs above are `RepartitionExec`s which are [leaf call-nodes for `out_part()`](https://github.com/apache/arrow-datafusion/blob/15f59d9861082a4d5d39bddce63d81cc7b9fb299/datafusion/physical-plan/src/repartition/mod.rs#L449), but also expand `eq_prop` into [2 of inputs](https://github.com/apache/arrow-datafusion/blob/15f59d9861082a4d5d39bddce63d81cc7b9fb299/datafusion/physical-plan/src/repartition/mod.rs#L465-L472) methods we get ```rust 5 x out_part() + 4 x eq_prop() + 2 x (out_part() + out_part()) + 3 x (eq_prop() + l_input.eq_prop() + l_input.out_part() + eq_prop() + r_input.eq_prop() + r_input.out_part()) = 9 x out_part() + 10 x eq_prop() + 6 x input.eq_prop() + 6 x input.eq_prop() = ``` - Lastly the bottom plan nodes are `MemoryExec`s which [terminate the `eq_prop()` call chain](https://github.com/apache/arrow-datafusion/blob/15f59d9861082a4d5d39bddce63d81cc7b9fb299/datafusion/physical-plan/src/memory.rs#L125) as well ```rust 9 x out_part() + 10 x eq_prop() + 6 x eq_prop() + 6 x eq_prop() = 15 x out_part() + 16 x eq_prop() ``` So for computing 1 thing (`Partitioning`), from a total of 7 plans the total invocation count for the above two methods was 31, thus with some overlapping. Note that some of these calls involve allocating stuff on the heap as well as some other computations, which can add up when the invocation count grows substantially. Finally, given that these 2 methods are liberally called inside `ensure_distribution` and its sub-routines, I think this explains the enormous call count and ultimately the slow-down. ### Expected behavior Some potential mitigations: - [Delay calling and re-use results](https://github.com/apache/arrow-datafusion/blob/15f59d9861082a4d5d39bddce63d81cc7b9fb299/datafusion/physical-plan/src/aggregates/mod.rs#L606-L613) from `output_partitioning` and `equivalence_properties` within methods. For instance in `AggregateExec` the second call to `input.output_partitioning` is redundant and the call to `input.equivalence_properties()` can also be delayed only in case of a match (and [likewise](https://github.com/apache/arrow-datafusion/blob/15f59d9861082a4d5d39bddce63d81cc7b9fb299/datafusion/physical-plan/src/projection.rs#L176-L181) for `ProjectionExec`). - A proper solution would probably involve `EnforceDistribution` interleaving some helper plans alongside with RepartitionExec` which serve to cache/short-circuit the `output_partitioning` and `equivalence_properties` already computed for the input below. ### Additional context _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
