rluvaton opened a new issue, #15530:
URL: https://github.com/apache/datafusion/issues/15530
### Describe the bug
when using aggregate exec with single mode, and spilling and the group by
expressions are not the first expressions from the previous plan there will be
schema mismatch
### To Reproduce
```rust
#[cfg(test)]
mod tests {
use std::fmt::{Display, Formatter};
use arrow::array::{ArrayRef, Int32Array, Int64Array, RecordBatch,
StringArray};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::common::Result;
use datafusion::execution::memory_pool::FairSpillPool;
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::execution::TaskContext;
use datafusion::functions_aggregate::sum::sum_udaf;
use datafusion::physical_expr::aggregate::AggregateExprBuilder;
use datafusion::physical_expr::expressions::{lit, Column};
use datafusion::physical_plan::aggregates::{PhysicalGroupBy,
AggregateExec, AggregateMode};
use datafusion::physical_plan::common::collect;
use datafusion::physical_plan::ExecutionPlan;
use rand::{random, thread_rng, Rng};
use std::sync::Arc;
use datafusion_physical_plan::memory::{LazyBatchGenerator,
LazyMemoryExec};
use parking_lot::RwLock;
#[tokio::test]
async fn test_debug() -> Result<()> {
let scan_schema = Arc::new(Schema::new(vec![
Field::new("col_0", DataType::Int64, true),
Field::new("col_1", DataType::Utf8, true),
Field::new("col_2", DataType::Utf8, true),
Field::new("col_3", DataType::Utf8, true),
Field::new("col_4", DataType::Utf8, true),
Field::new("col_5", DataType::Int32, true),
Field::new("col_6", DataType::Utf8, true),
Field::new("col_7", DataType::Utf8, true),
Field::new("col_8", DataType::Utf8, true),
]));
let group_by = PhysicalGroupBy::new_single(vec![
(Arc::new(Column::new("col_1", 1)), "col_1".to_string()),
(Arc::new(Column::new("col_7", 7)), "col_7".to_string()),
(Arc::new(Column::new("col_0", 0)), "col_0".to_string()),
(Arc::new(Column::new("col_8", 8)), "col_8".to_string()),
]);
fn generate_int64_array() -> ArrayRef {
Arc::new(Int64Array::from_iter_values(
(0..8192).map(|_| random::<i64>()),
))
}
fn generate_int32_array() -> ArrayRef {
Arc::new(Int32Array::from_iter_values(
(0..8192).map(|_| random::<i32>()),
))
}
fn generate_string_array() -> ArrayRef {
Arc::new(StringArray::from(
(0..8192)
.map(|_| -> String {
thread_rng()
.sample_iter::<char,
_>(rand::distributions::Standard)
.take(10)
.collect()
})
.collect::<Vec<_>>(),
))
}
fn generate_record_batch(schema: &SchemaRef) -> Result<RecordBatch> {
RecordBatch::try_new(
Arc::clone(&schema),
vec![
generate_int64_array(),
generate_string_array(),
generate_string_array(),
generate_string_array(),
generate_string_array(),
generate_int32_array(),
generate_string_array(),
generate_string_array(),
generate_string_array(),
],
)
.map_err(|err| err.into())
}
let aggregate_expressions = vec![Arc::new(
AggregateExprBuilder::new(sum_udaf(), vec![lit(1i64)])
.schema(Arc::clone(&scan_schema))
.alias("SUM(1i64)")
.build()?,
)];
#[derive(Debug)]
struct Generator {
index: usize,
count: usize,
schema: SchemaRef,
}
impl Display for Generator {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Generator")
}
}
impl LazyBatchGenerator for Generator {
fn generate_next_batch(&mut self) -> Result<Option<RecordBatch>>
{
if self.index > self.count {
return Ok(None);
}
let batch = generate_record_batch(&self.schema)?;
self.index += 1;
Ok(Some(batch))
}
}
let generator = Generator {
index: 0,
count: 10,
schema: Arc::clone(&scan_schema),
};
let plan: Arc<dyn ExecutionPlan> =
Arc::new(LazyMemoryExec::try_new(Arc::clone(&scan_schema),
vec![Arc::new(RwLock::new(generator))])?);
let single_aggregate = Arc::new(AggregateExec::try_new(
AggregateMode::Single,
group_by,
aggregate_expressions.clone(),
vec![None; aggregate_expressions.len()],
plan,
Arc::clone(&scan_schema),
)?);
let memory_pool = Arc::new(FairSpillPool::new(10006216));
let task_ctx = Arc::new(
TaskContext::default().with_runtime(Arc::new(
RuntimeEnvBuilder::new()
.with_memory_pool(memory_pool)
.build()?,
)),
);
let res = collect(single_aggregate.execute(0,
Arc::clone(&task_ctx))?).await;
match res {
Ok(_) => println!("Success"),
Err(e) => {
println!("Error: {}", e);
return Err(e);
},
}
Ok(())
}
}
```
The following error happen:
```
Error: Internal error: PhysicalExpr Column references column 'col_7' at
index 7 (zero-based) but input schema only has 5 columns: ["col_1", "col_7",
"col_0", "col_8", "SUM(1i64)[sum]"]
backtrace: 0: std::backtrace_rs::backtrace::libunwind::trace
...
1: std::backtrace_rs::backtrace::trace_unsynchronized
at ...
2: std::backtrace::Backtrace::create
at ...
3: datafusion_common::error::DataFusionError::get_back_trace
at <crates>/datafusion-common-46.0.1/src/error.rs:473:30
4: datafusion_physical_expr::expressions::column::Column::bounds_check
at
<crates>/datafusion-physical-expr-46.0.1/src/expressions/column.rs:147:13
5: <datafusion_physical_expr::expressions::column::Column as
datafusion_physical_expr_common::physical_expr::PhysicalExpr>::evaluate
at
<crates>/datafusion-physical-expr-46.0.1/src/expressions/column.rs:126:9
6: datafusion_physical_plan::aggregates::evaluate_group_by::{{closure}}
at
<crates>/datafusion-physical-plan-46.0.1/src/aggregates/mod.rs:1305:25
7: core::iter::adapters::map::map_try_fold::{{closure}}
at ...
8-21: std stuff
22: core::iter::traits::iterator::Iterator::collect
at ...
23: datafusion_physical_plan::aggregates::evaluate_group_by
at
<crates>/datafusion-physical-plan-46.0.1/src/aggregates/mod.rs:1301:32
24:
datafusion_physical_plan::aggregates::row_hash::GroupedHashAggregateStream::group_aggregate_batch
at
<crates>/datafusion-physical-plan-46.0.1/src/aggregates/row_hash.rs:821:13
25:
<datafusion_physical_plan::aggregates::row_hash::GroupedHashAggregateStream as
futures_core::stream::Stream>::poll_next
at
<crates>/datafusion-physical-plan-46.0.1/src/aggregates/row_hash.rs:697:29
26-28: futures stuff
29: datafusion_physical_plan::common::collect::{{closure}}
at <crates>/datafusion-physical-plan-46.0.1/src/common.rs:45:36
30: datafusion_pg::repro_bug_agg::tests::test_debug::{{closure}}
at ./src/repro_bug_agg.rs:145:99
31-51: std and tokio stuff
52: datafusion_pg::repro_bug_agg::tests::test_debug
at ./src/repro_bug_agg.rs:147:9
53: datafusion_pg::repro_bug_agg::tests::test_debug::{{closure}}
at ./src/repro_bug_agg.rs:23:30
.
This was likely caused by a bug in DataFusion's code and we would welcome
that you file an bug report in our issue tracker
```
### Expected behavior
should not fail
### Additional context
The spill schema is:
```
spill_state.spill_schema: Schema {
fields: [
Field {
name: "col_1",
data_type: Utf8,
nullable: true,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "col_7",
data_type: Utf8,
nullable: true,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "col_0",
data_type: Int64,
nullable: true,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "col_8",
data_type: Utf8,
nullable: true,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "SUM(1i64)[sum]",
data_type: Int64,
nullable: true,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
],
metadata: {},
}
```
the issue is that the spilling schema is the output schema of the
intermediate results while the group by expressions are the same and because
column point to an index rather than by name the index now does not exists
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]