Ke-Wng commented on issue #18989:
URL: https://github.com/apache/datafusion/issues/18989#issuecomment-3632927866
## **Polished Issue Description**
Hi @kosiew
I inspected the physical plan for both the successful and the failed cases
and found a structural difference that appears to explain the failure.
### **Physical plan in the successful case**
```text
SortExec: expr=[ts@0 ASC], preserve_partitioning=[false]
CoalescePartitionsExec
// Aggr 1
AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts],
aggr=[count(count(metrics.value))]
CoalesceBatchesExec: target_batch_size=8192
/* <-------------------------------------> */
RepartitionExec: partitioning=Hash([ts@0], 64), input_partitions=64
/* <-------------------------------------> */
AggregateExec: mode=Partial, gby=[ts@0 as ts],
aggr=[count(count(metrics.value))]
ProjectionExec: expr=[ts@1 as ts, count(metrics.value)@2 as
count(metrics.value)]
// Aggr 2
AggregateExec: mode=FinalPartitioned, gby=[region@0 as region,
ts@1 as ts], aggr=[count(metrics.value)]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([region@0, ts@1], 64),
input_partitions=2
AggregateExec: mode=Partial, gby=[region@1 as region,
ts@0 as ts], aggr=[count(metrics.value)]
DataSourceExec: partitions=2, partition_sizes=[1, 1]
```
### **Physical plan in the failed case**
```text
// Aggr 1
AggregateExec: mode=SinglePartitioned, gby=[ts@0 as ts],
aggr=[count(count(metrics.value))]
ProjectionExec: expr=[ts@1 as ts, count(metrics.value)@2 as
count(metrics.value)]
/* <-------------------------------------> */
// !!! Missing RepartitionExec !!!
/* <-------------------------------------> */
// Aggr 2
AggregateExec: mode=FinalPartitioned, gby=[region@0 as region, ts@1 as
ts], aggr=[count(metrics.value)]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([region@0, ts@1], 64),
input_partitions=2
AggregateExec: mode=Partial, gby=[region@1 as region, ts@0 as ts],
aggr=[count(metrics.value)]
DataSourceExec: partitions=2, partition_sizes=[0, 0]
```
### **Error message**
```text
thread 'main' panicked at:
Failed to create physical plan: Context("SanityCheckPlan", Plan("Plan: [...]
does not satisfy distribution requirements: HashPartitioned[[ts@0]]). Child-0
output partitioning: Hash([region@0, ts@0], 64)"))
```
### **Analysis**
Based on the error message, `Aggr 1` expects its **input** to be partitioned
by `ts` only (`HashPartitioned[[ts@0]]`).
However, in the failed case, `Aggr 2` produces **output** partitioned by
both `region` and `ts` (`Hash([region@0, ts@0], 64)`), which does not satisfy
`Aggr 1`’s distribution requirement.
In the successful plan, the optimizer correctly inserts a:
```text
RepartitionExec: partitioning=Hash([ts@0], 64)
```
between `Aggr 2` and `Aggr 1`, ensuring that `Aggr 1` receives the
distribution it requires.
In the failed plan, this `RepartitionExec` is **missing**, leading to the
sanity check failure.
### **Conclusion**
It seems that the physical optimizer does not insert the required
repartitioning operator in this scenario, resulting in an invalid plan. This
likely indicates a bug in how distribution requirements are propagated or
validated when multiple aggregations and projections are involved.
### **To reproduce**
```rust
use std::sync::Arc;
use datafusion::arrow::array::{Float64Array, Int64Array, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow::util::pretty::print_batches;
use datafusion::datasource::MemTable;
use datafusion::functions_aggregate::count::count_udaf;
use datafusion::logical_expr::col;
use datafusion::physical_plan::{collect, displayable};
use datafusion::prelude::*;
#[tokio::main]
async fn main() {
let ctx = SessionContext::default();
let schema = Arc::new(Schema::new(vec![
Field::new("ts", DataType::Int64, false),
Field::new("region", DataType::Utf8, false),
Field::new("value", DataType::Float64, false),
]));
// partition 1: us-west region
let partition1 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int64Array::from(vec![1000, 1000, 2000, 2000])),
Arc::new(StringArray::from(vec![
"us-west", "us-west", "us-west", "us-west",
])),
Arc::new(Float64Array::from(vec![10.5, 20.3, 15.2, 25.8])),
],
)
.expect("Failed to create partition 1");
// partition 2: eu-east region
let partition2 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int64Array::from(vec![1000, 1000, 2000])),
Arc::new(StringArray::from(vec!["eu-east", "eu-east",
"eu-east"])),
Arc::new(Float64Array::from(vec![30.1, 40.2, 35.5])),
],
)
.expect("Failed to create partition 2");
let mem_table = MemTable::try_new(schema.clone(), vec![vec![partition1],
vec![partition2]])
.expect("Failed to create MemTable");
// uncomment the following line to reproduce the panic
// let mem_table = MemTable::try_new(schema.clone(), vec![vec![],
vec![]])
// .expect("Failed to create MemTable");
ctx.register_table("metrics", Arc::new(mem_table))
.expect("Failed to register table");
let data_frame = ctx
.table("metrics")
.await
.expect("Failed to get table")
.aggregate(
vec![col("region"), col("ts")],
vec![count_udaf().call(vec![col("value")])],
)
.expect("Failed first aggregate")
.sort(vec![
col("region").sort(true, true),
col("ts").sort(true, true),
])
.expect("Failed first sort")
.aggregate(
vec![col("ts")],
vec![count_udaf().call(vec![col("count(metrics.value)")])],
)
.expect("Failed second aggregate")
.sort(vec![col("ts").sort(true, true)])
.expect("Failed second sort");
println!(
"Logical Plan:\n{}",
data_frame.logical_plan().display_indent()
);
let plan = data_frame
.create_physical_plan()
.await
.expect("Failed to create physical plan");
println!(
"\nPhysical Plan:\n{}",
displayable(plan.as_ref()).indent(true)
);
println!("\nExecuting query (should not panic)...");
let task_ctx = ctx.task_ctx();
let results = collect(plan, task_ctx).await.expect("Failed to execute");
print_batches(&results).expect("Failed to print batches");
println!("\n✅ Success! The query executed without panicking.");
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]