GitHub user Samyak2 edited a discussion: `repartitioned` method doesn't work for changing partitioning of a subtree after planning
Consider this code: Note: [better example in the comments below](https://github.com/apache/datafusion/discussions/18924#discussioncomment-15077000) ```rust use std::sync::Arc; use datafusion::{ config::ConfigOptions, error::DataFusionError, physical_plan::{ExecutionPlan, displayable}, prelude::*, }; fn repartition_subtree( plan: Arc<dyn ExecutionPlan>, config: &ConfigOptions, ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> { let new_children = plan .children() .into_iter() .map(|child| repartition_subtree(Arc::clone(child), config)) .collect::<Result<_, _>>()?; let plan = plan.with_new_children(new_children)?; // change partitions to 4 let repartitioned_plan = plan.repartitioned(4, config)?; if let Some(new_plan) = repartitioned_plan { Ok(new_plan) } else { Ok(plan) } } #[tokio::main] async fn main() -> datafusion::error::Result<()> { let config = SessionConfig::new().with_target_partitions(2); // register the table let ctx = SessionContext::new_with_config(config); ctx.register_parquet("example", "<path to datafusion repo>/datafusion/core/tests/data/test_statistics_per_partition/", ParquetReadOptions::new()) .await?; // create a plan to run a SQL query let df = ctx .sql("SELECT id, MIN(0) FROM example GROUP BY id LIMIT 100") .await?; let plan = df.create_physical_plan().await.unwrap(); println!("plan: {}", displayable(plan.as_ref()).indent(false)); let state = ctx.state(); let new_plan = repartition_subtree(plan, state.config_options()).unwrap(); println!("new_plan: {}", displayable(new_plan.as_ref()).indent(false)); Ok(()) } ``` The output is this: ``` plan: CoalescePartitionsExec: fetch=100 AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], aggr=[min(Int64(0))] CoalesceBatchesExec: target_batch_size=8192 RepartitionExec: partitioning=Hash([id@0], 2), input_partitions=2 AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[min(Int64(0))] DataSourceExec: file_groups={2 groups: [[<path>/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet, <path>/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet], [<path>/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet, <path>/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet]]}, projection=[id], file_type=parquet new_plan: CoalescePartitionsExec: fetch=100 AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], aggr=[min(Int64(0))] CoalesceBatchesExec: target_batch_size=8192 RepartitionExec: partitioning=Hash([id@0], 4), input_partitions=2 AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[min(Int64(0))] DataSourceExec: file_groups={2 groups: [[<path>/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet, <path>/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet], [<path>/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet, <path>/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet]]}, projection=[id], file_type=parquet ``` I plan the query with a `target_partitions` of 2. But then I want to increase the partitioning of a subtree to `4`. I do that by recursively called `.repartitioned` on each of the physical plan nodes. Notice that the number of `file_groups` remains the same before and after! Only the partitioning of `RepartitionExec` has changed. It seems to be because of this code: https://github.com/apache/datafusion/blob/d24eb4a23156b7814836e765d5890186ab40682f/datafusion/datasource/src/file_groups.rs#L192-L199 Even though, I'm scanning entire files in this case, the range is set in these file groups and repartitioning is skipped! GitHub link: https://github.com/apache/datafusion/discussions/18924 ---- This is an automatically sent email for [email protected]. To unsubscribe, please send an email to: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
