GitHub user Samyak2 edited a discussion: `repartitioned` method doesn't work 
for changing partitioning of a subtree after planning

Consider this code:

Note: [better example in the comments 
below](https://github.com/apache/datafusion/discussions/18924#discussioncomment-15077000)

```rust
use std::sync::Arc;

use datafusion::{
    config::ConfigOptions,
    error::DataFusionError,
    physical_plan::{ExecutionPlan, displayable},
    prelude::*,
};

fn repartition_subtree(
    plan: Arc<dyn ExecutionPlan>,
    config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
    let new_children = plan
        .children()
        .into_iter()
        .map(|child| repartition_subtree(Arc::clone(child), config))
        .collect::<Result<_, _>>()?;
    let plan = plan.with_new_children(new_children)?;

    // change partitions to 4
    let repartitioned_plan = plan.repartitioned(4, config)?;
    if let Some(new_plan) = repartitioned_plan {
        Ok(new_plan)
    } else {
        Ok(plan)
    }
}

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
    let config = SessionConfig::new().with_target_partitions(2);

    // register the table
    let ctx = SessionContext::new_with_config(config);
    ctx.register_parquet("example", "<path to datafusion 
repo>/datafusion/core/tests/data/test_statistics_per_partition/", 
ParquetReadOptions::new())
        .await?;

    // create a plan to run a SQL query
    let df = ctx
        .sql("SELECT id, MIN(0) FROM example GROUP BY id LIMIT 100")
        .await?;

    let plan = df.create_physical_plan().await.unwrap();

    println!("plan: {}", displayable(plan.as_ref()).indent(false));

    let state = ctx.state();

    let new_plan = repartition_subtree(plan, state.config_options()).unwrap();
    println!("new_plan: {}", displayable(new_plan.as_ref()).indent(false));

    Ok(())
}
```

The output is this:
```
plan: CoalescePartitionsExec: fetch=100
  AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], aggr=[min(Int64(0))]
    CoalesceBatchesExec: target_batch_size=8192
      RepartitionExec: partitioning=Hash([id@0], 2), input_partitions=2
        AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[min(Int64(0))]
          DataSourceExec: file_groups={2 groups: 
[[<path>/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet,
 
<path>/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet],
 
[<path>/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet,
 
<path>/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet]]},
 projection=[id], file_type=parquet

new_plan: CoalescePartitionsExec: fetch=100
  AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], aggr=[min(Int64(0))]
    CoalesceBatchesExec: target_batch_size=8192
      RepartitionExec: partitioning=Hash([id@0], 4), input_partitions=2
        AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[min(Int64(0))]
          DataSourceExec: file_groups={2 groups: 
[[<path>/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet,
 
<path>/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet],
 
[<path>/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet,
 
<path>/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet]]},
 projection=[id], file_type=parquet
```

I plan the query with a `target_partitions` of 2. But then I want to increase 
the partitioning of a subtree to `4`. I do that by recursively called 
`.repartitioned` on each of the physical plan nodes.

Notice that the number of `file_groups` remains the same before and after! Only 
the partitioning of `RepartitionExec` has changed.

It seems to be because of this code: 
https://github.com/apache/datafusion/blob/d24eb4a23156b7814836e765d5890186ab40682f/datafusion/datasource/src/file_groups.rs#L192-L199

Even though, I'm scanning entire files in this case, the range is set in these 
file groups and repartitioning is skipped!

GitHub link: https://github.com/apache/datafusion/discussions/18924

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: 
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to