GitHub user Samyak2 added a comment to the discussion: `repartitioned` method 
doesn't work for changing partitioning of a subtree after planning

Here's a better example:

```rust
use std::sync::Arc;

use datafusion::{
    config::ConfigOptions,
    error::DataFusionError,
    physical_plan::{ExecutionPlan, displayable},
    prelude::*,
};

fn repartition_subtree(
    plan: Arc<dyn ExecutionPlan>,
    config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
    let new_children = plan
        .children()
        .into_iter()
        .map(|child| repartition_subtree(Arc::clone(child), config))
        .collect::<Result<_, _>>()?;
    let plan = plan.with_new_children(new_children)?;

    // change partitions to 4
    let repartitioned_plan = plan.repartitioned(4, config)?;
    if let Some(new_plan) = repartitioned_plan {
        Ok(new_plan)
    } else {
        Ok(plan)
    }
}

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
    let config = SessionConfig::new()
        .with_target_partitions(2);

    // register the table
    let ctx = SessionContext::new_with_config(config);
    ctx.register_parquet("example", "./partitioned_output", 
ParquetReadOptions::new())
        .await?;

    // create a plan to run a SQL query
    let df = ctx
        .sql("SELECT b, MIN(0) FROM example GROUP BY b LIMIT 100")
        .await?;

    let plan = df.create_physical_plan().await.unwrap();

    println!("plan: {}", displayable(plan.as_ref()).indent(false));

    let state = ctx.state();

    let new_plan = repartition_subtree(plan, state.config_options()).unwrap();
    println!("new_plan: {}", displayable(new_plan.as_ref()).indent(false));

    Ok(())
}
```

Here's the code to generate the data (excuse the code quality, it was written 
with AI):
```rust
use std::sync::Arc;

use arrow::array::{Int32Array, Int64Array};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use datafusion::dataframe::DataFrameWriteOptions;
use datafusion::prelude::*;
use rand::Rng;

const NUM_ROWS: usize = 10_000_000;
const BATCH_SIZE: usize = 1_000_000; // Process in batches to manage memory

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
    println!("Creating DataFusion context...");
    let ctx = SessionContext::new();

    // Define schema
    let schema = Arc::new(Schema::new(vec![
        Field::new("a", DataType::Int32, false), // 0 to 1000
        Field::new("b", DataType::Int64, false), // 0 to 1,000,000
    ]));

    println!("Generating {} random rows...", NUM_ROWS);

    // Generate data in batches to avoid memory issues
    let mut batches = Vec::new();
    let mut rng = rand::thread_rng();

    for batch_num in 0..(NUM_ROWS / BATCH_SIZE) {
        let start = batch_num * BATCH_SIZE;
        println!(
            "  Generating batch {} ({} - {} rows)...",
            batch_num + 1,
            start,
            start + BATCH_SIZE
        );

        // Generate random data for column 'a' (0 to 1000)
        let col_a: Vec<i32> = (0..BATCH_SIZE).map(|_| 
rng.gen_range(0..=1000)).collect();

        // Generate random data for column 'b' (0 to 1,000,000)
        let col_b: Vec<i64> = (0..BATCH_SIZE)
            .map(|_| rng.gen_range(0..=1_000_000))
            .collect();

        let batch = RecordBatch::try_new(
            schema.clone(),
            vec![
                Arc::new(Int32Array::from(col_a)),
                Arc::new(Int64Array::from(col_b)),
            ],
        )?;

        batches.push(batch);
    }

    println!("Creating DataFrame from {} batches...", batches.len());

    // Create a DataFrame from the record batches
    let df = ctx.read_batches(batches)?;

    // Show sample data
    println!("\nSample data (first 10 rows):");
    df.clone().limit(0, Some(10))?.show().await?;

    // Show row count
    let row_count = df.clone().count().await?;
    println!("\nTotal rows: {}", row_count);

    // Write as partitioned parquet files (partitioned by column 'a')
    let output_path = "./partitioned_output";
    println!(
        "\nWriting partitioned parquet files to '{}'...",
        output_path
    );
    println!("Partitioning by column 'a'...");

    df.write_parquet(
        output_path,
        DataFrameWriteOptions::new().with_partition_by(vec!["a".to_string()]),
        None, // Use default parquet writer options
    )
    .await?;

    println!("\n✓ Successfully wrote partitioned parquet files!");
    println!("  Output directory: {}", output_path);

    Ok(())
}
```

With the latest main of datafusion (commit 
`48cc4c8af3a5ad500a44c8625ea44d6b4827af1e`), we get this:
```
plan: CoalescePartitionsExec: fetch=100
  AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[min(Int64(0))]
    CoalesceBatchesExec: target_batch_size=8192
      RepartitionExec: partitioning=Hash([b@0], 2), input_partitions=2
        AggregateExec: mode=Partial, gby=[b@0 as b], aggr=[min(Int64(0))]
          DataSourceExec: file_groups={2 groups: [[<removed for brevity>]]}, 
projection=[b], file_type=parquet

new_plan: CoalescePartitionsExec: fetch=100
  AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[min(Int64(0))]
    CoalesceBatchesExec: target_batch_size=8192
      RepartitionExec: partitioning=Hash([b@0], 4), input_partitions=2
        AggregateExec: mode=Partial, gby=[b@0 as b], aggr=[min(Int64(0))]
          DataSourceExec: file_groups={2 groups: [[<removed for brevity>]]}, 
projection=[b], file_type=parquet
```

The partitioning remains the same.

Now, if I remove the [range 
check](https://github.com/apache/datafusion/blob/d24eb4a23156b7814836e765d5890186ab40682f/datafusion/datasource/src/file_groups.rs#L192-L199),
 we get this:
```
new_plan: CoalescePartitionsExec: fetch=100
  AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[min(Int64(0))]
    CoalesceBatchesExec: target_batch_size=8192
      RepartitionExec: partitioning=Hash([b@0], 4), input_partitions=4
        AggregateExec: mode=Partial, gby=[b@0 as b], aggr=[min(Int64(0))]
          DataSourceExec: file_groups={4 groups: [[<removed for brevity>]]}, 
projection=[b], file_type=parquet
```

The partitioning increases as expected.

Now my question is this: is this something that is just lacking an 
implementation? I don't see a good reason to avoid repartitioning when ranges 
exist for a file. I don't have enough context to decide this. I'm willing to 
implement this if there's no objections to it.

GitHub link: 
https://github.com/apache/datafusion/discussions/18924#discussioncomment-15077000

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: 
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to