Ke-Wng commented on issue #18989:
URL: https://github.com/apache/datafusion/issues/18989#issuecomment-3632927866

   ## **Polished Issue Description**
   
   Hi @kosiew 
   
   I inspected the physical plan for both the successful and the failed cases 
and found a structural difference that appears to explain the failure.
   
   ### **Physical plan in the successful case**
   
   ```text
   SortExec: expr=[ts@0 ASC], preserve_partitioning=[false]
     CoalescePartitionsExec
       // Aggr 1
       AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], 
aggr=[count(count(metrics.value))]
         CoalesceBatchesExec: target_batch_size=8192
           /* <------------------------------------->  */
           RepartitionExec: partitioning=Hash([ts@0], 64), input_partitions=64
           /* <------------------------------------->  */
             AggregateExec: mode=Partial, gby=[ts@0 as ts], 
aggr=[count(count(metrics.value))]
               ProjectionExec: expr=[ts@1 as ts, count(metrics.value)@2 as 
count(metrics.value)]
                 // Aggr 2
                 AggregateExec: mode=FinalPartitioned, gby=[region@0 as region, 
ts@1 as ts], aggr=[count(metrics.value)]
                   CoalesceBatchesExec: target_batch_size=8192
                     RepartitionExec: partitioning=Hash([region@0, ts@1], 64), 
input_partitions=2
                       AggregateExec: mode=Partial, gby=[region@1 as region, 
ts@0 as ts], aggr=[count(metrics.value)]
                         DataSourceExec: partitions=2, partition_sizes=[1, 1]
   ```
   
   ### **Physical plan in the failed case**
   
   ```text
   // Aggr 1
   AggregateExec: mode=SinglePartitioned, gby=[ts@0 as ts], 
aggr=[count(count(metrics.value))]
     ProjectionExec: expr=[ts@1 as ts, count(metrics.value)@2 as 
count(metrics.value)]
       /* <------------------------------------->  */
       // !!! Missing RepartitionExec !!!
       /* <------------------------------------->  */
       // Aggr 2
       AggregateExec: mode=FinalPartitioned, gby=[region@0 as region, ts@1 as 
ts], aggr=[count(metrics.value)]
         CoalesceBatchesExec: target_batch_size=8192
           RepartitionExec: partitioning=Hash([region@0, ts@1], 64), 
input_partitions=2
             AggregateExec: mode=Partial, gby=[region@1 as region, ts@0 as ts], 
aggr=[count(metrics.value)]
               DataSourceExec: partitions=2, partition_sizes=[0, 0]
   ```
   
   ### **Error message**
   
   ```text
   thread 'main' panicked at:
   Failed to create physical plan: Context("SanityCheckPlan", Plan("Plan: [...] 
does not satisfy distribution requirements: HashPartitioned[[ts@0]]). Child-0 
output partitioning: Hash([region@0, ts@0], 64)"))
   ```
   
   ### **Analysis**
   
   Based on the error message, `Aggr 1` expects its **input** to be partitioned 
by `ts` only (`HashPartitioned[[ts@0]]`).
   However, in the failed case, `Aggr 2` produces **output** partitioned by 
both `region` and `ts` (`Hash([region@0, ts@0], 64)`), which does not satisfy 
`Aggr 1`’s distribution requirement.
   
   In the successful plan, the optimizer correctly inserts a:
   
   ```text
   RepartitionExec: partitioning=Hash([ts@0], 64)
   ```
   
   between `Aggr 2` and `Aggr 1`, ensuring that `Aggr 1` receives the 
distribution it requires.
   
   In the failed plan, this `RepartitionExec` is **missing**, leading to the 
sanity check failure.
   
   ### **Conclusion**
   
   It seems that the physical optimizer does not insert the required 
repartitioning operator in this scenario, resulting in an invalid plan. This 
likely indicates a bug in how distribution requirements are propagated or 
validated when multiple aggregations and projections are involved.
   
   ### **To reproduce**
   ```rust
   use std::sync::Arc;
   
   use datafusion::arrow::array::{Float64Array, Int64Array, StringArray};
   use datafusion::arrow::datatypes::{DataType, Field, Schema};
   use datafusion::arrow::record_batch::RecordBatch;
   use datafusion::arrow::util::pretty::print_batches;
   use datafusion::datasource::MemTable;
   use datafusion::functions_aggregate::count::count_udaf;
   use datafusion::logical_expr::col;
   use datafusion::physical_plan::{collect, displayable};
   use datafusion::prelude::*;
   
   #[tokio::main]
   async fn main() {
       let ctx = SessionContext::default();
   
       let schema = Arc::new(Schema::new(vec![
           Field::new("ts", DataType::Int64, false),
           Field::new("region", DataType::Utf8, false),
           Field::new("value", DataType::Float64, false),
       ]));
   
       // partition 1: us-west region
       let partition1 = RecordBatch::try_new(
           schema.clone(),
           vec![
               Arc::new(Int64Array::from(vec![1000, 1000, 2000, 2000])),
               Arc::new(StringArray::from(vec![
                   "us-west", "us-west", "us-west", "us-west",
               ])),
               Arc::new(Float64Array::from(vec![10.5, 20.3, 15.2, 25.8])),
           ],
       )
       .expect("Failed to create partition 1");
   
       // partition 2: eu-east region
       let partition2 = RecordBatch::try_new(
           schema.clone(),
           vec![
               Arc::new(Int64Array::from(vec![1000, 1000, 2000])),
               Arc::new(StringArray::from(vec!["eu-east", "eu-east", 
"eu-east"])),
               Arc::new(Float64Array::from(vec![30.1, 40.2, 35.5])),
           ],
       )
       .expect("Failed to create partition 2");
   
       let mem_table = MemTable::try_new(schema.clone(), vec![vec![partition1], 
vec![partition2]])
           .expect("Failed to create MemTable");
       // uncomment the following line to reproduce the panic
       // let mem_table = MemTable::try_new(schema.clone(), vec![vec![], 
vec![]])
       //     .expect("Failed to create MemTable");
       ctx.register_table("metrics", Arc::new(mem_table))
           .expect("Failed to register table");
   
       let data_frame = ctx
           .table("metrics")
           .await
           .expect("Failed to get table")
           .aggregate(
               vec![col("region"), col("ts")],
               vec![count_udaf().call(vec![col("value")])],
           )
           .expect("Failed first aggregate")
           .sort(vec![
               col("region").sort(true, true),
               col("ts").sort(true, true),
           ])
           .expect("Failed first sort")
           .aggregate(
               vec![col("ts")],
               vec![count_udaf().call(vec![col("count(metrics.value)")])],
           )
           .expect("Failed second aggregate")
           .sort(vec![col("ts").sort(true, true)])
           .expect("Failed second sort");
   
       println!(
           "Logical Plan:\n{}",
           data_frame.logical_plan().display_indent()
       );
   
       let plan = data_frame
           .create_physical_plan()
           .await
           .expect("Failed to create physical plan");
   
       println!(
           "\nPhysical Plan:\n{}",
           displayable(plan.as_ref()).indent(true)
       );
   
       println!("\nExecuting query (should not panic)...");
   
       let task_ctx = ctx.task_ctx();
       let results = collect(plan, task_ctx).await.expect("Failed to execute");
   
       print_batches(&results).expect("Failed to print batches");
       
       println!("\n✅ Success! The query executed without panicking.");
   }
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to