zhuqi-lucas commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2948434379
Updated the reproducer case now in latest PR, and i commented it because our rule or built-in Yield still not fix this. ```rust #[tokio::test] async fn test_filter_reject_all_batches_cancel() -> Result<(), Box<dyn Error>> { // 1) Create a Session, Schema, and an 8K-row RecordBatch let session_ctx = SessionContext::new(); let schema: SchemaRef = Arc::new(Schema::new(vec![Field::new( "value", DataType::Int64, false, )])); // Build a batch with values 0..8191 let mut builder = Int64Array::builder(8_192); for v in 0..8_192 { builder.append_value(v); } let batch = Arc::new(RecordBatch::try_new( schema.clone(), vec![Arc::new(builder.finish())], )?); // 2a) Wrap this batch in an InfiniteExec let infinite = Arc::new(InfiniteExec::new(&batch)); // 2b) Construct a FilterExec that is always false: “value > 10000” (no rows pass) let false_predicate: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new( Arc::new(Column::new_with_schema("value", &schema)?), Gt, Arc::new(Literal::new(ScalarValue::Int64(Some(10_000)))), )); let filtered: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(false_predicate, infinite.clone())?); // 2c) Use CoalesceBatchesExec to guarantee each Filter pull always yields an 8192-row batch let coalesced: Arc<dyn ExecutionPlan> = Arc::new(CoalesceBatchesExec::new(filtered.clone(), 8_192)); // 2d) Hash-repartition into 1 partition (so that a later global aggregation would run on a single partition) let exprs: Vec<Arc<dyn PhysicalExpr>> = vec![Arc::new(Column::new_with_schema("value", &schema)?)]; let part = Partitioning::Hash(exprs.clone(), 1); let hashed: Arc<dyn ExecutionPlan> = Arc::new(RepartitionExec::try_new(coalesced.clone(), part.clone())?); // 4) WrapLeaves to insert YieldExec—so that the InfiniteExec yields control between batches let config = ConfigOptions::new(); let optimized = WrapLeaves::new().optimize(hashed, &config)?; // 5) Execute with a 1-second timeout. Because Filter discards all 8192 rows each time // without ever producing output, no batch will arrive within 1 second. And since // emission type is not Final, we never see an end‐of‐stream marker. let mut stream = physical_plan::execute_stream(optimized, session_ctx.task_ctx())?; const TIMEOUT: u64 = 1; let result = select! { batch_opt = stream.next() => batch_opt, _ = tokio::time::sleep(tokio::time::Duration::from_secs(TIMEOUT)) => { None } }; assert!( result.is_none(), "Expected no output for infinite + filter(all-false) + aggregate, but got a batch" ); Ok(()) } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org