haohuaijin opened a new issue, #21459:
URL: https://github.com/apache/datafusion/issues/21459
### Describe the bug
ProjectionExec: expr=[timestamp@0 as timestamp, tokens@1 as tokens]
FilterExec: timestamp@0 >= 1775570460000000, projection=[timestamp@0,
tokens@1, service_name@2]
DataSourceExec: partitions=1, partition_sizes=[1]
apply `ProjectionPushdown` to above plan will got panic
thread 'main' (13911523) panicked at examples/query.rs:92:10:
called `Result::unwrap()` on an `Err` value: ArrowError(SchemaError("project
index 2 out of bounds, max field 2"), Some(""))
### To Reproduce
```rust
use std::sync::Arc;
use arrow::array::{Int64Array, RecordBatch, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::util::pretty::pretty_format_batches;
use datafusion::common::config::ConfigOptions;
use datafusion::execution::TaskContext;
use datafusion::physical_expr::expressions::{BinaryExpr, Literal, col};
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_optimizer::projection_pushdown::ProjectionPushdown;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::displayable;
use datafusion::physical_plan::filter::FilterExecBuilder;
use datafusion::physical_plan::projection::{ProjectionExec, ProjectionExpr};
use datafusion_common::ScalarValue;
use datafusion_expr::Operator;
use futures::TryStreamExt;
#[tokio::main]
async fn main() {
// Schema: [timestamp, tokens, service_name]
let schema = Arc::new(Schema::new(vec![
Field::new("timestamp", DataType::Int64, false),
Field::new("tokens", DataType::Int64, false),
Field::new("service_name", DataType::Utf8, false),
]));
// Sample data
let timestamps = vec![1775570460000000i64];
let tokens = vec![100i64];
let services = vec!["service-a"];
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int64Array::from(timestamps)),
Arc::new(Int64Array::from(tokens)),
Arc::new(StringArray::from(services)),
],
)
.unwrap();
// DataSource: MemoryExec with projection=[timestamp, tokens,
service_name]
let memory_exec =
datafusion::catalog::memory::MemorySourceConfig::try_new_exec(
&[vec![batch]],
schema.clone(),
None,
)
.unwrap();
// Filter predicate: timestamp >= 1775570460000000
let predicate = Arc::new(BinaryExpr::new(
col("timestamp", &memory_exec.schema()).unwrap(),
Operator::GtEq,
Arc::new(Literal::new(ScalarValue::Int64(Some(1775570460000000)))),
));
// FilterExec: predicate with projection=[timestamp@0, tokens@1,
service_name@2]
let filter_exec = Arc::new(
FilterExecBuilder::new(predicate, memory_exec)
.apply_projection(Some(vec![0, 1, 2]))
.unwrap()
.build()
.unwrap(),
);
// ProjectionExec: expr=[timestamp@0 as timestamp, tokens@1 as tokens]
let proj_exprs = vec![
ProjectionExpr {
expr: col("timestamp", &filter_exec.schema()).unwrap(),
alias: "timestamp".to_string(),
},
ProjectionExpr {
expr: col("tokens", &filter_exec.schema()).unwrap(),
alias: "tokens".to_string(),
},
];
let projection_exec =
Arc::new(ProjectionExec::try_new(proj_exprs, filter_exec).unwrap());
// Print the plan before optimization
let display = displayable(projection_exec.as_ref() as &dyn
ExecutionPlan);
println!("before pushdown plan:\n{}", display.indent(true));
// Apply projection pushdown optimization
let config = ConfigOptions::default();
let optimized = ProjectionPushdown::new()
.optimize(
Arc::clone(&projection_exec) as Arc<dyn ExecutionPlan>,
&config,
)
.unwrap();
// Print the plan after optimization
let display = displayable(optimized.as_ref());
println!("after pushdown plan:\n{}", display.indent(true));
// Execute and print results before pushdown
let task_ctx = Arc::new(TaskContext::default());
let stream = projection_exec.execute(0, Arc::clone(&task_ctx)).unwrap();
let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
println!(
"before pushdown result:\n{}",
pretty_format_batches(&batches).unwrap()
);
// Execute and print results after pushdown
let stream = optimized.execute(0, Arc::clone(&task_ctx)).unwrap();
let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
println!(
"after pushdown result:\n{}",
pretty_format_batches(&batches).unwrap()
);
}
```
### Expected behavior
i should work fine
### Additional context
it work in datafusion v52, but report error in v53
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]