Samyak2 opened a new issue, #18940:
URL: https://github.com/apache/datafusion/issues/18940

   ### Describe the bug
   
   I'm trying to use `ExecutionPlan::repartitioned` to change the partitioning 
of an already optimized plan. The reason I can't just set `target_partitions` 
is that I need to change the partitioning of a subtree of the plan, not the 
entire plan. So I tried recursively calling `ExecutionPlan::repartitioned` on 
every node of the plan and noticed that it did not change a thing.
   
   ### To Reproduce
   
   Here's the code that shows the before and after plans (uses Datafusion 51):
   ```rust
   use std::sync::Arc;
   
   use datafusion::{
       config::ConfigOptions,
       error::DataFusionError,
       physical_plan::{ExecutionPlan, displayable},
       prelude::*,
   };
   
   fn repartition_subtree(
       plan: Arc<dyn ExecutionPlan>,
       config: &ConfigOptions,
   ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
       let new_children = plan
           .children()
           .into_iter()
           .map(|child| repartition_subtree(Arc::clone(child), config))
           .collect::<Result<_, _>>()?;
       let plan = plan.with_new_children(new_children)?;
   
       // change partitions to 4
       let repartitioned_plan = plan.repartitioned(4, config)?;
       if let Some(new_plan) = repartitioned_plan {
           Ok(new_plan)
       } else {
           Ok(plan)
       }
   }
   
   #[tokio::main]
   async fn main() -> datafusion::error::Result<()> {
       let config = SessionConfig::new()
           .with_target_partitions(2);
   
       // register the table
       let ctx = SessionContext::new_with_config(config);
       ctx.register_parquet("example", "./partitioned_output", 
ParquetReadOptions::new())
           .await?;
   
       // create a plan to run a SQL query
       let df = ctx
           .sql("SELECT b, MIN(0) FROM example GROUP BY b LIMIT 100")
           .await?;
   
       let plan = df.create_physical_plan().await.unwrap();
   
       println!("plan: {}", displayable(plan.as_ref()).indent(false));
   
       let state = ctx.state();
   
       let new_plan = repartition_subtree(plan, 
state.config_options()).unwrap();
       println!("new_plan: {}", displayable(new_plan.as_ref()).indent(false));
   
       Ok(())
   }
   ```
   
   Here's the code to generate the data (excuse the code quality, it was 
written with AI):
   ```rust
   use std::sync::Arc;
   
   use arrow::array::{Int32Array, Int64Array};
   use arrow::datatypes::{DataType, Field, Schema};
   use arrow::record_batch::RecordBatch;
   use datafusion::dataframe::DataFrameWriteOptions;
   use datafusion::prelude::*;
   use rand::Rng;
   
   const NUM_ROWS: usize = 10_000_000;
   const BATCH_SIZE: usize = 1_000_000; // Process in batches to manage memory
   
   #[tokio::main]
   async fn main() -> datafusion::error::Result<()> {
       println!("Creating DataFusion context...");
       let ctx = SessionContext::new();
   
       // Define schema
       let schema = Arc::new(Schema::new(vec![
           Field::new("a", DataType::Int32, false), // 0 to 1000
           Field::new("b", DataType::Int64, false), // 0 to 1,000,000
       ]));
   
       println!("Generating {} random rows...", NUM_ROWS);
   
       // Generate data in batches to avoid memory issues
       let mut batches = Vec::new();
       let mut rng = rand::thread_rng();
   
       for batch_num in 0..(NUM_ROWS / BATCH_SIZE) {
           let start = batch_num * BATCH_SIZE;
           println!(
               "  Generating batch {} ({} - {} rows)...",
               batch_num + 1,
               start,
               start + BATCH_SIZE
           );
   
           // Generate random data for column 'a' (0 to 1000)
           let col_a: Vec<i32> = (0..BATCH_SIZE).map(|_| 
rng.gen_range(0..=1000)).collect();
   
           // Generate random data for column 'b' (0 to 1,000,000)
           let col_b: Vec<i64> = (0..BATCH_SIZE)
               .map(|_| rng.gen_range(0..=1_000_000))
               .collect();
   
           let batch = RecordBatch::try_new(
               schema.clone(),
               vec![
                   Arc::new(Int32Array::from(col_a)),
                   Arc::new(Int64Array::from(col_b)),
               ],
           )?;
   
           batches.push(batch);
       }
   
       println!("Creating DataFrame from {} batches...", batches.len());
   
       // Create a DataFrame from the record batches
       let df = ctx.read_batches(batches)?;
   
       // Show sample data
       println!("\nSample data (first 10 rows):");
       df.clone().limit(0, Some(10))?.show().await?;
   
       // Show row count
       let row_count = df.clone().count().await?;
       println!("\nTotal rows: {}", row_count);
   
       // Write as partitioned parquet files (partitioned by column 'a')
       let output_path = "./partitioned_output";
       println!(
           "\nWriting partitioned parquet files to '{}'...",
           output_path
       );
       println!("Partitioning by column 'a'...");
   
       df.write_parquet(
           output_path,
           
DataFrameWriteOptions::new().with_partition_by(vec!["a".to_string()]),
           None, // Use default parquet writer options
       )
       .await?;
   
       println!("\n✓ Successfully wrote partitioned parquet files!");
       println!("  Output directory: {}", output_path);
   
       Ok(())
   }
   ```
   
   With the latest main of datafusion (commit 
`48cc4c8af3a5ad500a44c8625ea44d6b4827af1e`), we get this:
   ```
   plan: CoalescePartitionsExec: fetch=100
     AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[min(Int64(0))]
       CoalesceBatchesExec: target_batch_size=8192
         RepartitionExec: partitioning=Hash([b@0], 2), input_partitions=2
           AggregateExec: mode=Partial, gby=[b@0 as b], aggr=[min(Int64(0))]
             DataSourceExec: file_groups={2 groups: [[<removed for brevity>]]}, 
projection=[b], file_type=parquet
   
   new_plan: CoalescePartitionsExec: fetch=100
     AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[min(Int64(0))]
       CoalesceBatchesExec: target_batch_size=8192
         RepartitionExec: partitioning=Hash([b@0], 4), input_partitions=2
           AggregateExec: mode=Partial, gby=[b@0 as b], aggr=[min(Int64(0))]
             DataSourceExec: file_groups={2 groups: [[<removed for brevity>]]}, 
projection=[b], file_type=parquet
   ```
   The partitioning remains the same.
   
   Now, if I remove the [range 
check](https://github.com/apache/datafusion/blob/d24eb4a23156b7814836e765d5890186ab40682f/datafusion/datasource/src/file_groups.rs#L192-L199),
 we get this:
   ```
   new_plan: CoalescePartitionsExec: fetch=100
     AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[min(Int64(0))]
       CoalesceBatchesExec: target_batch_size=8192
         RepartitionExec: partitioning=Hash([b@0], 4), input_partitions=4
           AggregateExec: mode=Partial, gby=[b@0 as b], aggr=[min(Int64(0))]
             DataSourceExec: file_groups={4 groups: [[<removed for brevity>]]}, 
projection=[b], file_type=parquet
   ```
   
   The partitioning increases as expected.
   
   ### Expected behavior
   
   The plan should be repartitioned even if ranges exist on the file groups. In 
the above example, I don't have any filters, so the whole file is being 
scanned. But because ranges exist on the file, the partitioning isn't happening.
   
   ### Additional context
   
   There doesn't seem to be any good reason to skip partitioning when file 
ranges exist. @Jefffrey also thinks the same: 
https://github.com/apache/datafusion/discussions/18924#discussioncomment-15083186
   
   I'm willing to get a PR out for this if it makes sense


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to