haohuaijin opened a new issue, #21459:
URL: https://github.com/apache/datafusion/issues/21459

   ### Describe the bug
   
   ProjectionExec: expr=[timestamp@0 as timestamp, tokens@1 as tokens]
     FilterExec: timestamp@0 >= 1775570460000000, projection=[timestamp@0, 
tokens@1, service_name@2]
       DataSourceExec: partitions=1, partition_sizes=[1]
   
   apply `ProjectionPushdown` to above plan will got panic
   
   thread 'main' (13911523) panicked at examples/query.rs:92:10:
   called `Result::unwrap()` on an `Err` value: ArrowError(SchemaError("project 
index 2 out of bounds, max field 2"), Some(""))
   
   ### To Reproduce
   
   ```rust
   use std::sync::Arc;
   
   use arrow::array::{Int64Array, RecordBatch, StringArray};
   use arrow::datatypes::{DataType, Field, Schema};
   use arrow::util::pretty::pretty_format_batches;
   use datafusion::common::config::ConfigOptions;
   use datafusion::execution::TaskContext;
   use datafusion::physical_expr::expressions::{BinaryExpr, Literal, col};
   use datafusion::physical_optimizer::PhysicalOptimizerRule;
   use datafusion::physical_optimizer::projection_pushdown::ProjectionPushdown;
   use datafusion::physical_plan::ExecutionPlan;
   use datafusion::physical_plan::displayable;
   use datafusion::physical_plan::filter::FilterExecBuilder;
   use datafusion::physical_plan::projection::{ProjectionExec, ProjectionExpr};
   use datafusion_common::ScalarValue;
   use datafusion_expr::Operator;
   use futures::TryStreamExt;
   
   #[tokio::main]
   async fn main() {
       // Schema: [timestamp, tokens, service_name]
       let schema = Arc::new(Schema::new(vec![
           Field::new("timestamp", DataType::Int64, false),
           Field::new("tokens", DataType::Int64, false),
           Field::new("service_name", DataType::Utf8, false),
       ]));
   
       // Sample data
       let timestamps = vec![1775570460000000i64];
       let tokens = vec![100i64];
       let services = vec!["service-a"];
   
       let batch = RecordBatch::try_new(
           schema.clone(),
           vec![
               Arc::new(Int64Array::from(timestamps)),
               Arc::new(Int64Array::from(tokens)),
               Arc::new(StringArray::from(services)),
           ],
       )
       .unwrap();
   
       // DataSource: MemoryExec with projection=[timestamp, tokens, 
service_name]
       let memory_exec = 
datafusion::catalog::memory::MemorySourceConfig::try_new_exec(
           &[vec![batch]],
           schema.clone(),
           None,
       )
       .unwrap();
   
       // Filter predicate: timestamp >= 1775570460000000
       let predicate = Arc::new(BinaryExpr::new(
           col("timestamp", &memory_exec.schema()).unwrap(),
           Operator::GtEq,
           Arc::new(Literal::new(ScalarValue::Int64(Some(1775570460000000)))),
       ));
   
       // FilterExec: predicate with projection=[timestamp@0, tokens@1, 
service_name@2]
       let filter_exec = Arc::new(
           FilterExecBuilder::new(predicate, memory_exec)
               .apply_projection(Some(vec![0, 1, 2]))
               .unwrap()
               .build()
               .unwrap(),
       );
   
       // ProjectionExec: expr=[timestamp@0 as timestamp, tokens@1 as tokens]
       let proj_exprs = vec![
           ProjectionExpr {
               expr: col("timestamp", &filter_exec.schema()).unwrap(),
               alias: "timestamp".to_string(),
           },
           ProjectionExpr {
               expr: col("tokens", &filter_exec.schema()).unwrap(),
               alias: "tokens".to_string(),
           },
       ];
       let projection_exec =
           Arc::new(ProjectionExec::try_new(proj_exprs, filter_exec).unwrap());
   
       // Print the plan before optimization
       let display = displayable(projection_exec.as_ref() as &dyn 
ExecutionPlan);
       println!("before pushdown plan:\n{}", display.indent(true));
   
       // Apply projection pushdown optimization
       let config = ConfigOptions::default();
       let optimized = ProjectionPushdown::new()
           .optimize(
               Arc::clone(&projection_exec) as Arc<dyn ExecutionPlan>,
               &config,
           )
           .unwrap();
   
       // Print the plan after optimization
       let display = displayable(optimized.as_ref());
       println!("after pushdown plan:\n{}", display.indent(true));
   
       // Execute and print results before pushdown
       let task_ctx = Arc::new(TaskContext::default());
       let stream = projection_exec.execute(0, Arc::clone(&task_ctx)).unwrap();
       let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
       println!(
           "before pushdown result:\n{}",
           pretty_format_batches(&batches).unwrap()
       );
   
       // Execute and print results after pushdown
       let stream = optimized.execute(0, Arc::clone(&task_ctx)).unwrap();
       let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
       println!(
           "after pushdown result:\n{}",
           pretty_format_batches(&batches).unwrap()
       );
   }
   
   ```
   
   ### Expected behavior
   
   i should work fine
   
   ### Additional context
   
   it work in datafusion v52, but report error in v53


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to