GitHub user Samyak2 edited a discussion: `repartitioned` method doesn't work to
change partitioning of a subtree after planning
Consider this code:
```rust
use std::sync::Arc;
use datafusion::{
config::ConfigOptions,
error::DataFusionError,
physical_plan::{ExecutionPlan, displayable},
prelude::*,
};
fn repartition_subtree(
plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
let new_children = plan
.children()
.into_iter()
.map(|child| repartition_subtree(Arc::clone(child), config))
.collect::<Result<_, _>>()?;
let plan = plan.with_new_children(new_children)?;
// change partitions to 4
let repartitioned_plan = plan.repartitioned(4, config)?;
if let Some(new_plan) = repartitioned_plan {
Ok(new_plan)
} else {
Ok(plan)
}
}
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
let config = SessionConfig::new().with_target_partitions(2);
// register the table
let ctx = SessionContext::new_with_config(config);
ctx.register_parquet("example", "<path to datafusion
repo>/datafusion/core/tests/data/test_statistics_per_partition/",
ParquetReadOptions::new())
.await?;
// create a plan to run a SQL query
let df = ctx
.sql("SELECT id, MIN(0) FROM example GROUP BY id LIMIT 100")
.await?;
let plan = df.create_physical_plan().await.unwrap();
println!("plan: {}", displayable(plan.as_ref()).indent(false));
let state = ctx.state();
let new_plan = repartition_subtree(plan, state.config_options()).unwrap();
println!("new_plan: {}", displayable(new_plan.as_ref()).indent(false));
Ok(())
}
```
The output is this:
```
plan: CoalescePartitionsExec: fetch=100
AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], aggr=[min(Int64(0))]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([id@0], 2), input_partitions=2
AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[min(Int64(0))]
DataSourceExec: file_groups={2 groups:
[[<path>/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet,
<path>/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet],
[<path>/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet,
<path>/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet]]},
projection=[id], file_type=parquet
new_plan: CoalescePartitionsExec: fetch=100
AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], aggr=[min(Int64(0))]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([id@0], 4), input_partitions=2
AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[min(Int64(0))]
DataSourceExec: file_groups={2 groups:
[[<path>/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet,
<path>/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet],
[<path>/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet,
<path>/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet]]},
projection=[id], file_type=parquet
```
I plan the query with a `target_partitions` of 2. But then I want to increase
the partitioning of a subtree to `4`. I do that by recursively called
`.repartitioned` on each of the physical plan nodes.
Notice that the number of `file_groups` remains the same before and after! Only
the partitioning of `RepartitionExec` has changed.
It seems to be because of this code:
https://github.com/apache/datafusion/blob/d24eb4a23156b7814836e765d5890186ab40682f/datafusion/datasource/src/file_groups.rs#L192-L199
Even though, I'm scanning entire files in this case, the range is set in these
file groups and repartitioning is skipped!
GitHub link: https://github.com/apache/datafusion/discussions/18924
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]