2010YOUY01 opened a new issue, #15088: URL: https://github.com/apache/datafusion/issues/15088
### Is your feature request related to a problem or challenge? Now sort executor and aggregate executor both support executing in parallel, and the degree of parallelism is specified in configuration [`datafusion.execution.target_partitions`](https://datafusion.apache.org/user-guide/configs.html). When the input of sort/aggregate executor has less output partition than `target_partitions`, both executors would benefit from a round-robin repartition, because input may generate data faster than those executors can process. If we construct a memory table with only one output partition, aggregate queries will automatically insert a `RepartitionExec` to do round-robin repartition. I think this behavior is expected ``` +---------------+------------------------------------------------------------------------------------------+ | plan_type | plan | +---------------+------------------------------------------------------------------------------------------+ | logical_plan | Projection: min(users.bank_account) | | | Aggregate: groupBy=[[users.id]], aggr=[[min(users.bank_account)]] | | | TableScan: users projection=[id, bank_account] | | physical_plan | ProjectionExec: expr=[min(users.bank_account)@1 as min(users.bank_account)] | | | AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], aggr=[min(users.bank_account)] | | | CoalesceBatchesExec: target_batch_size=8192 | | | RepartitionExec: partitioning=Hash([id@0], 14), input_partitions=14 | | | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[min(users.bank_account)] | | | RepartitionExec: partitioning=RoundRobinBatch(14), input_partitions=1 | | | DataSourceExec: partitions=1, partition_sizes=[100] | | | | +---------------+------------------------------------------------------------------------------------------+ ``` However, sort queries won't do the same repartition. ``` +---------------+---------------------------------------------------------------------+ | plan_type | plan | +---------------+---------------------------------------------------------------------+ | logical_plan | Sort: users.id ASC NULLS LAST | | | TableScan: users projection=[id, bank_account] | | physical_plan | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] | | | DataSourceExec: partitions=1, partition_sizes=[100] | | | | +---------------+---------------------------------------------------------------------+ ``` You can find the reproducer below <details> <summary> Reproducer </summary> Place the following file in `datafusion-examples/examples` ```rust use datafusion::arrow::array::{UInt64Array, UInt8Array}; use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion::arrow::record_batch::RecordBatch; use datafusion::datasource::MemTable; use datafusion::error::Result; use datafusion::prelude::*; use std::sync::Arc; #[tokio::main] async fn main() -> Result<()> { query_memtable().await?; Ok(()) } /// Run a simple query against a [`MemTable`] pub async fn query_memtable() -> Result<()> { let mem_table = create_memtable()?; let ctx = SessionContext::new(); ctx.register_table("users", Arc::new(mem_table))?; // Both aggregate and sort can benefit from repartitioning, however only aggregation // is repartitioned, if input is a `MemoryExec` with smaller partitions than the // `target_partitions` configuration option. // This aggregation query can get round-robin repartitioned let dataframe = ctx .sql("explain SELECT min(bank_account) FROM users group by id;") .await?; dataframe.clone().show().await?; // This sort query won't get repartitioned let dataframe = ctx.sql("explain SELECT * FROM users order by id;").await?; dataframe.clone().show().await?; Ok(()) } /// Create a [`MemTable`] with 100 batches of 8192 rows each, in 1 partition fn create_memtable() -> Result<MemTable> { let mut batches = Vec::with_capacity(100); for _ in 0..100 { batches.push(create_record_batch()?); } let partitions = vec![batches]; MemTable::try_new(get_schema(), partitions) } fn create_record_batch() -> Result<RecordBatch> { let id_array = UInt8Array::from(vec![1; 8192]); let account_array = UInt64Array::from(vec![9000; 8192]); Ok(RecordBatch::try_new( get_schema(), vec![Arc::new(id_array), Arc::new(account_array)], ) .unwrap()) } fn get_schema() -> SchemaRef { SchemaRef::new(Schema::new(vec![ Field::new("id", DataType::UInt8, false), Field::new("bank_account", DataType::UInt64, true), ])) } ``` </details> ### Describe the solution you'd like Let sort query also be able to insert round-robin repartition, if there are not enough partitions in the input plan. I guess it's modifying something in `enforce_distribution.rs` physical optimizer rule 🤔 ### Describe alternatives you've considered _No response_ ### Additional context _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
