2010YOUY01 opened a new issue, #15088:
URL: https://github.com/apache/datafusion/issues/15088

   ### Is your feature request related to a problem or challenge?
   
   Now sort executor and aggregate executor both support executing in parallel, 
and the degree of parallelism is specified in configuration 
[`datafusion.execution.target_partitions`](https://datafusion.apache.org/user-guide/configs.html).
   When the input of sort/aggregate executor has less output partition than 
`target_partitions`, both executors would benefit from a round-robin 
repartition, because input may generate data faster than those executors can 
process.
   
   If we construct a memory table with only one output partition, aggregate 
queries will automatically insert a `RepartitionExec` to do round-robin 
repartition. I think this behavior is expected
   ```
   
+---------------+------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                       
                              |
   
+---------------+------------------------------------------------------------------------------------------+
   | logical_plan  | Projection: min(users.bank_account)                        
                              |
   |               |   Aggregate: groupBy=[[users.id]], 
aggr=[[min(users.bank_account)]]                      |
   |               |     TableScan: users projection=[id, bank_account]         
                              |
   | physical_plan | ProjectionExec: expr=[min(users.bank_account)@1 as 
min(users.bank_account)]              |
   |               |   AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], 
aggr=[min(users.bank_account)] |
   |               |     CoalesceBatchesExec: target_batch_size=8192            
                              |
   |               |       RepartitionExec: partitioning=Hash([id@0], 14), 
input_partitions=14                |
   |               |         AggregateExec: mode=Partial, gby=[id@0 as id], 
aggr=[min(users.bank_account)]    |
   |               |           RepartitionExec: 
partitioning=RoundRobinBatch(14), input_partitions=1          |
   |               |             DataSourceExec: partitions=1, 
partition_sizes=[100]                          |
   |               |                                                            
                              |
   
+---------------+------------------------------------------------------------------------------------------+
   ```
   
   However, sort queries won't do the same repartition.
   ```
   
+---------------+---------------------------------------------------------------------+
   | plan_type     | plan                                                       
         |
   
+---------------+---------------------------------------------------------------------+
   | logical_plan  | Sort: users.id ASC NULLS LAST                              
         |
   |               |   TableScan: users projection=[id, bank_account]           
         |
   | physical_plan | SortExec: expr=[id@0 ASC NULLS LAST], 
preserve_partitioning=[false] |
   |               |   DataSourceExec: partitions=1, partition_sizes=[100]      
         |
   |               |                                                            
         |
   
+---------------+---------------------------------------------------------------------+
   ```
   
   You can find the reproducer below
   
   <details>
   <summary> Reproducer </summary>
   
   Place the following file in `datafusion-examples/examples`
   ```rust
   use datafusion::arrow::array::{UInt64Array, UInt8Array};
   use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
   use datafusion::arrow::record_batch::RecordBatch;
   use datafusion::datasource::MemTable;
   use datafusion::error::Result;
   use datafusion::prelude::*;
   use std::sync::Arc;
   
   #[tokio::main]
   async fn main() -> Result<()> {
       query_memtable().await?;
       Ok(())
   }
   
   /// Run a simple query against a [`MemTable`]
   pub async fn query_memtable() -> Result<()> {
       let mem_table = create_memtable()?;
       let ctx = SessionContext::new();
       ctx.register_table("users", Arc::new(mem_table))?;
   
       // Both aggregate and sort can benefit from repartitioning, however only 
aggregation
       // is repartitioned, if input is a `MemoryExec` with smaller partitions 
than the
       // `target_partitions` configuration option.
   
       // This aggregation query can get round-robin repartitioned
       let dataframe = ctx
           .sql("explain SELECT min(bank_account) FROM users group by id;")
           .await?;
   
       dataframe.clone().show().await?;
   
       // This sort query won't get repartitioned
       let dataframe = ctx.sql("explain SELECT * FROM users order by 
id;").await?;
   
       dataframe.clone().show().await?;
   
       Ok(())
   }
   
   /// Create a [`MemTable`] with 100 batches of 8192 rows each, in 1 partition
   fn create_memtable() -> Result<MemTable> {
       let mut batches = Vec::with_capacity(100);
       for _ in 0..100 {
           batches.push(create_record_batch()?);
       }
       let partitions = vec![batches];
       MemTable::try_new(get_schema(), partitions)
   }
   
   fn create_record_batch() -> Result<RecordBatch> {
       let id_array = UInt8Array::from(vec![1; 8192]);
       let account_array = UInt64Array::from(vec![9000; 8192]);
   
       Ok(RecordBatch::try_new(
           get_schema(),
           vec![Arc::new(id_array), Arc::new(account_array)],
       )
       .unwrap())
   }
   
   fn get_schema() -> SchemaRef {
       SchemaRef::new(Schema::new(vec![
           Field::new("id", DataType::UInt8, false),
           Field::new("bank_account", DataType::UInt64, true),
       ]))
   }
   ```
   
   </details>
   
   ### Describe the solution you'd like
   
   Let sort query also be able to insert round-robin repartition, if there are 
not enough partitions in the input plan.
   I guess it's modifying something in `enforce_distribution.rs` physical 
optimizer rule 🤔 
   
   ### Describe alternatives you've considered
   
   _No response_
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to