gruuya opened a new issue, #9084:
URL: https://github.com/apache/arrow-datafusion/issues/9084

   ### Describe the bug
   
   I have been troubleshooting the TPCH-DS query 64, and have found some 
performance issues, seemingly stemming from 
`ExecutionPlan::output_partitioning`, `ExecutionPlan::equivalence_properties` 
and their inherent branching nature throughout various `ExecutionPlan` 
implementations.
   
   In particular the problem manifests by the process pinning the CPU at 100%, 
and getting stuck in the `EnforceDistribution` physical optimizer, which 
aggravates the underlying problem by inter-leaving `RepartitionExec` into the 
existing plan tree.
   
   ### To Reproduce
   
   #### Setup
   You can use the existing `tpcds_physical_q64` test, just un-ignore it and 
set `env_logger::init()` somewhere to capture the logs. You'll also need to set 
`RUST_MIN_STACK=3000000` to alleviate an unrelated problem described in 
https://github.com/apache/arrow-datafusion/issues/4786.
   
   #### Flamegraph
   Next you can optionally capture the flamegraph to get an initial sense of 
what's happening
   ```bash
   sudo cargo flamegraph --dev -v --test tpcds_planning -- tpcds_physical_q64
   ```
   I see something like 
[flamegraph.svg.zip](https://github.com/apache/arrow-datafusion/files/14112252/flamegraph.svg.zip).
   
   Notably, there is a large build-up of (likely under-sampled) 
`output_partitioning` and `equivalence_properties` calls.
   
   #### Investigation
   I used some counters
   ```rust
   pub struct Counter {
       count: std::sync::Mutex<u32>,
   }
   
   impl Counter {
       pub fn new() -> Counter {
           Counter {
               count: std::sync::Mutex::new(0),
           }
       }
   
       pub fn reset(&self) {
           let mut counter = self.count.lock().unwrap();
           *counter = 0;
       }
   
       pub fn increment(&self) {
           let mut counter = self.count.lock().unwrap();
           *counter += 1;
       }
   
       pub fn get(&self) -> u32 {
           let counter = self.count.lock().unwrap();
           *counter
       }
   }
   
   use once_cell::sync::Lazy;
   
   pub static OUT_PART_COUNTER: Lazy<Arc<Counter>> = Lazy::new(|| 
Arc::new(Counter::new()));
   pub static EQ_PROP_COUNTER: Lazy<Arc<Counter>> = Lazy::new(|| 
Arc::new(Counter::new()));
   ```
   
   and then sprinkled `crate::OUT_PART_COUNTER.increment()` and 
`crate::EQ_PROP_COUNTER.increment()` at the begining of `fn 
output_partitioning(&self)` and `fn equivalence_properties(&self)` 
respectively, for `MemoryExec`, `RepartitionExec`, `HashJoinExec`, 
`ProjectionExec` and `AggregateExec`.
   
   I also added the following
   ```diff
   diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs 
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
   index 0c5c2d78b..d4c0ddc22 100644
   --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
   +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
   @@ -208,9 +208,16 @@ impl PhysicalOptimizerRule for EnforceDistribution {
            };
    
            let distribution_context = 
DistributionContext::new_default(adjusted);
   +        use datafusion_physical_plan::{OUT_PART_COUNTER, EQ_PROP_COUNTER};
   +        OUT_PART_COUNTER.reset();
   +        EQ_PROP_COUNTER.reset();
            // Distribution enforcement needs to be applied bottom-up.
            let distribution_context =
                distribution_context.transform_up(&|distribution_context| {
   +                log::warn!("output_partitioning {}, equivalence_properties 
{}", OUT_PART_COUNTER.get(), EQ_PROP_COUNTER.get());
   +                OUT_PART_COUNTER.reset();
   +                EQ_PROP_COUNTER.reset();
   +
                    ensure_distribution(distribution_context, config)
                })?;
            Ok(distribution_context.plan)
   ```
   
   #### Results
   Soon enough after starting the test you'll notice these numbers reach values 
on the order of 100K and then millions and more as the slowdown in the 
iterations becomes appreciable
   ```bash
   [2024-01-31T13:58:01Z WARN  
datafusion::physical_optimizer::enforce_distribution] output_partitioning 
40955, equivalence_properties 58363
   [2024-01-31T13:58:01Z WARN  
datafusion::physical_optimizer::enforce_distribution] output_partitioning 0, 
equivalence_properties 0
   [2024-01-31T13:58:03Z WARN  
datafusion::physical_optimizer::enforce_distribution] output_partitioning 
245749, equivalence_properties 350184
   [2024-01-31T13:58:03Z WARN  
datafusion::physical_optimizer::enforce_distribution] output_partitioning 
81915, equivalence_properties 116731
   [2024-01-31T13:58:03Z WARN  
datafusion::physical_optimizer::enforce_distribution] output_partitioning 0, 
equivalence_properties 0
   [2024-01-31T13:58:06Z WARN  
datafusion::physical_optimizer::enforce_distribution] output_partitioning 
491509, equivalence_properties 700392
   [2024-01-31T13:58:07Z WARN  
datafusion::physical_optimizer::enforce_distribution] output_partitioning 
163835, equivalence_properties 233467
   [2024-01-31T13:58:07Z WARN  
datafusion::physical_optimizer::enforce_distribution] output_partitioning 0, 
equivalence_properties 0
   [2024-01-31T13:58:12Z WARN  
datafusion::physical_optimizer::enforce_distribution] output_partitioning 
983029, equivalence_properties 1400808
   [2024-01-31T13:58:14Z WARN  
datafusion::physical_optimizer::enforce_distribution] output_partitioning 
327675, equivalence_properties 466939
   [2024-01-31T13:58:14Z WARN  
datafusion::physical_optimizer::enforce_distribution] output_partitioning 0, 
equivalence_properties 0
   [2024-01-31T13:58:25Z WARN  
datafusion::physical_optimizer::enforce_distribution] output_partitioning 
1966069, equivalence_properties 2801640
   [2024-01-31T13:58:29Z WARN  
datafusion::physical_optimizer::enforce_distribution] output_partitioning 
655355, equivalence_properties 933883
   [2024-01-31T13:58:29Z WARN  
datafusion::physical_optimizer::enforce_distribution] output_partitioning 0, 
equivalence_properties 0
   [2024-01-31T13:58:29Z WARN  
datafusion::physical_optimizer::enforce_distribution] output_partitioning 0, 
equivalence_properties 1
   [2024-01-31T13:58:29Z WARN  
datafusion::physical_optimizer::enforce_distribution] output_partitioning 0, 
equivalence_properties 1
   [2024-01-31T13:58:51Z WARN  
datafusion::physical_optimizer::enforce_distribution] output_partitioning 
3932157, equivalence_properties 5603310
   [2024-01-31T13:59:13Z WARN  
datafusion::physical_optimizer::enforce_distribution] output_partitioning 
3932151, equivalence_properties 5603324
   [2024-01-31T13:59:50Z WARN  
datafusion::physical_optimizer::enforce_distribution] output_partitioning 
6553589, equivalence_properties 9338876
   [2024-01-31T14:02:12Z WARN  
datafusion::physical_optimizer::enforce_distribution] output_partitioning 
24903668, equivalence_properties 35487723
   ```
   
   #### Explanation
   WARNING: A lot of gritty details below
   Taking a look at one of the plans that appears early in the optimization 
process:
   ```bash
   [
       "AggregateExec: mode=Partial, gby=[cs_item_sk@0 as cs_item_sk], 
aggr=[SUM(catalog_sales.cs_ext_list_price), 
SUM(catalog_returns.cr_refunded_cash + catalog_returns.cr_reversed_charge + 
catalog_returns.cr_store_credit)]",
       "  ProjectionExec: expr=[cs_item_sk@0 as cs_item_sk, cs_ext_list_price@2 
as cs_ext_list_price, cr_refunded_cash@5 as cr_refunded_cash, 
cr_reversed_charge@6 as cr_reversed_charge, cr_store_credit@7 as 
cr_store_credit]",
       "    HashJoinExec: mode=Partitioned, join_type=Inner, on=[(cs_item_sk@0, 
cr_item_sk@0), (cs_order_number@1, cr_order_number@1)]",
       "      RepartitionExec: partitioning=Hash([cs_item_sk@0, 
cs_order_number@1], 12), input_partitions=0",
       "        MemoryExec: partitions=0, partition_sizes=[]",
       "      RepartitionExec: partitioning=Hash([cr_item_sk@0, 
cr_order_number@1], 12), input_partitions=0",
       "        MemoryExec: partitions=0, partition_sizes=[]",
   ]
   ```
   and exploring the relevant implementations of `output_partitioning` and 
`equivalence_properties`, you can see that they have the potential to branch 
off into calling two methods on the input, thus leading to a exponential call 
tree.
   
   In particular calling just `output_partitioning()` on the top-most plan 
leads to:
   - in 
[`AggregateExec::output_partitioning`](https://github.com/apache/arrow-datafusion/blob/15f59d9861082a4d5d39bddce63d81cc7b9fb299/datafusion/physical-plan/src/aggregates/mod.rs#L601-L608)
 
       ```rust
       1 x out_part() + 2 x input.out_part() + input.eq_prop()
       ```
   - for the `ProjectionExec` node below [this gets 
expanded](https://github.com/apache/arrow-datafusion/blob/15f59d9861082a4d5d39bddce63d81cc7b9fb299/datafusion/physical-plan/src/projection.rs#L175-L176)
 to 
       ```rust
       1 x out_part() + 2 x (out_part() + input.out_part() + input.eq_prop()) + 
(eq_prop() + input.eq_prop()) = 
       3 x out_part() + eq_prop() + 2 x input.out_part() + 3 x input.eq_prop()
       ```
   - Subsequently, `HashJoinExec` will 
[expand](https://github.com/apache/arrow-datafusion/blob/15f59d9861082a4d5d39bddce63d81cc7b9fb299/datafusion/physical-plan/src/joins/symmetric_hash_join.rs#L398-L412)
 these calls to left and right inputs
       ```rust
       3 x out_part() + eq_prop() + 2 x (out_part() + left.out_part() + 
right.out_part()) + 3 x (eq_prop() + left.eq_prop() + right.eq_prop()) = 
       5 x out_part() + 4 x eq_prop() + 2 x (left.out_part() + 
right.out_part()) + 3 x (left.eq_prop() + right.eq_prop())
       ```
   - Next, given that both left and right inputs above are `RepartitionExec`s 
which are [leaf call-nodes for 
`out_part()`](https://github.com/apache/arrow-datafusion/blob/15f59d9861082a4d5d39bddce63d81cc7b9fb299/datafusion/physical-plan/src/repartition/mod.rs#L449),
 but also expand `eq_prop` into [2 of 
inputs](https://github.com/apache/arrow-datafusion/blob/15f59d9861082a4d5d39bddce63d81cc7b9fb299/datafusion/physical-plan/src/repartition/mod.rs#L465-L472)
 methods we get
       ```rust
       5 x out_part() + 4 x eq_prop() + 2 x (out_part() + out_part()) + 3 x 
(eq_prop() + l_input.eq_prop() + l_input.out_part() + eq_prop() + 
r_input.eq_prop() + r_input.out_part()) =
       9 x out_part() + 10 x eq_prop() + 6 x input.eq_prop() + 6 x 
input.eq_prop() =
       ```
   - Lastly the bottom plan nodes are `MemoryExec`s which [terminate the 
`eq_prop()` call 
chain](https://github.com/apache/arrow-datafusion/blob/15f59d9861082a4d5d39bddce63d81cc7b9fb299/datafusion/physical-plan/src/memory.rs#L125)
 as well
       ```rust
       9 x out_part() + 10 x eq_prop() + 6 x eq_prop() + 6 x eq_prop() =
       15 x out_part() + 16 x eq_prop()
       ```
   
   So for computing 1 thing (`Partitioning`), from a total of 7 plans the total 
invocation count for the above two methods was 31, thus with some overlapping. 
Note that some of these calls involve allocating stuff on the heap as well as 
some other computations, which can add up when the invocation count grows 
substantially.
   
   Finally, given that these 2 methods are liberally called inside 
`ensure_distribution` and its sub-routines, I think this explains the enormous 
call count and ultimately the slow-down.
   
   ### Expected behavior
   
   Some potential mitigations:
   
   - [Delay calling and re-use 
results](https://github.com/apache/arrow-datafusion/blob/15f59d9861082a4d5d39bddce63d81cc7b9fb299/datafusion/physical-plan/src/aggregates/mod.rs#L606-L613)
 from `output_partitioning` and `equivalence_properties` within methods. For 
instance in `AggregateExec` the second call to `input.output_partitioning` is 
redundant and the call to `input.equivalence_properties()` can also be delayed 
only in case of a match (and 
[likewise](https://github.com/apache/arrow-datafusion/blob/15f59d9861082a4d5d39bddce63d81cc7b9fb299/datafusion/physical-plan/src/projection.rs#L176-L181)
 for `ProjectionExec`).
   - A proper solution would probably involve `EnforceDistribution` 
interleaving some helper plans alongside with RepartitionExec` which serve to 
cache/short-circuit the `output_partitioning` and `equivalence_properties` 
already computed for the input below.
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to