GitHub user Samyak2 added a comment to the discussion: `repartitioned` method
doesn't work for changing partitioning of a subtree after planning
Here's a better example:
```rust
use std::sync::Arc;
use datafusion::{
config::ConfigOptions,
error::DataFusionError,
physical_plan::{ExecutionPlan, displayable},
prelude::*,
};
fn repartition_subtree(
plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
let new_children = plan
.children()
.into_iter()
.map(|child| repartition_subtree(Arc::clone(child), config))
.collect::<Result<_, _>>()?;
let plan = plan.with_new_children(new_children)?;
// change partitions to 4
let repartitioned_plan = plan.repartitioned(4, config)?;
if let Some(new_plan) = repartitioned_plan {
Ok(new_plan)
} else {
Ok(plan)
}
}
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
let config = SessionConfig::new()
.with_target_partitions(2);
// register the table
let ctx = SessionContext::new_with_config(config);
ctx.register_parquet("example", "./partitioned_output",
ParquetReadOptions::new())
.await?;
// create a plan to run a SQL query
let df = ctx
.sql("SELECT b, MIN(0) FROM example GROUP BY b LIMIT 100")
.await?;
let plan = df.create_physical_plan().await.unwrap();
println!("plan: {}", displayable(plan.as_ref()).indent(false));
let state = ctx.state();
let new_plan = repartition_subtree(plan, state.config_options()).unwrap();
println!("new_plan: {}", displayable(new_plan.as_ref()).indent(false));
Ok(())
}
```
Here's the code to generate the data (excuse the code quality, it was written
with AI):
```rust
use std::sync::Arc;
use arrow::array::{Int32Array, Int64Array};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use datafusion::dataframe::DataFrameWriteOptions;
use datafusion::prelude::*;
use rand::Rng;
const NUM_ROWS: usize = 10_000_000;
const BATCH_SIZE: usize = 1_000_000; // Process in batches to manage memory
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
println!("Creating DataFusion context...");
let ctx = SessionContext::new();
// Define schema
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false), // 0 to 1000
Field::new("b", DataType::Int64, false), // 0 to 1,000,000
]));
println!("Generating {} random rows...", NUM_ROWS);
// Generate data in batches to avoid memory issues
let mut batches = Vec::new();
let mut rng = rand::thread_rng();
for batch_num in 0..(NUM_ROWS / BATCH_SIZE) {
let start = batch_num * BATCH_SIZE;
println!(
" Generating batch {} ({} - {} rows)...",
batch_num + 1,
start,
start + BATCH_SIZE
);
// Generate random data for column 'a' (0 to 1000)
let col_a: Vec<i32> = (0..BATCH_SIZE).map(|_|
rng.gen_range(0..=1000)).collect();
// Generate random data for column 'b' (0 to 1,000,000)
let col_b: Vec<i64> = (0..BATCH_SIZE)
.map(|_| rng.gen_range(0..=1_000_000))
.collect();
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(col_a)),
Arc::new(Int64Array::from(col_b)),
],
)?;
batches.push(batch);
}
println!("Creating DataFrame from {} batches...", batches.len());
// Create a DataFrame from the record batches
let df = ctx.read_batches(batches)?;
// Show sample data
println!("\nSample data (first 10 rows):");
df.clone().limit(0, Some(10))?.show().await?;
// Show row count
let row_count = df.clone().count().await?;
println!("\nTotal rows: {}", row_count);
// Write as partitioned parquet files (partitioned by column 'a')
let output_path = "./partitioned_output";
println!(
"\nWriting partitioned parquet files to '{}'...",
output_path
);
println!("Partitioning by column 'a'...");
df.write_parquet(
output_path,
DataFrameWriteOptions::new().with_partition_by(vec!["a".to_string()]),
None, // Use default parquet writer options
)
.await?;
println!("\n✓ Successfully wrote partitioned parquet files!");
println!(" Output directory: {}", output_path);
Ok(())
}
```
With the latest main of datafusion (commit
`48cc4c8af3a5ad500a44c8625ea44d6b4827af1e`), we get this:
```
plan: CoalescePartitionsExec: fetch=100
AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[min(Int64(0))]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([b@0], 2), input_partitions=2
AggregateExec: mode=Partial, gby=[b@0 as b], aggr=[min(Int64(0))]
DataSourceExec: file_groups={2 groups: [[<removed for brevity>]]},
projection=[b], file_type=parquet
new_plan: CoalescePartitionsExec: fetch=100
AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[min(Int64(0))]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([b@0], 4), input_partitions=2
AggregateExec: mode=Partial, gby=[b@0 as b], aggr=[min(Int64(0))]
DataSourceExec: file_groups={2 groups: [[<removed for brevity>]]},
projection=[b], file_type=parquet
```
The partitioning remains the same.
Now, if I remove the [range
check](https://github.com/apache/datafusion/blob/d24eb4a23156b7814836e765d5890186ab40682f/datafusion/datasource/src/file_groups.rs#L192-L199),
we get this:
```
new_plan: CoalescePartitionsExec: fetch=100
AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[min(Int64(0))]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([b@0], 4), input_partitions=4
AggregateExec: mode=Partial, gby=[b@0 as b], aggr=[min(Int64(0))]
DataSourceExec: file_groups={4 groups: [[<removed for brevity>]]},
projection=[b], file_type=parquet
```
The partitioning increases as expected.
Now my question is this: is this something that is just lacking an
implementation? I don't see a good reason to avoid repartitioning when ranges
exist for a file. I don't have enough context to decide this. I'm willing to
implement this if there's no objections to it.
GitHub link:
https://github.com/apache/datafusion/discussions/18924#discussioncomment-15077000
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]