zhuqi-lucas commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2948434379

   Updated the reproducer case now in latest PR, and i commented it because our 
rule or built-in Yield still not fix this.
   
   ```rust
   #[tokio::test]
   async fn test_filter_reject_all_batches_cancel() -> Result<(), Box<dyn 
Error>> {
       // 1) Create a Session, Schema, and an 8K-row RecordBatch
       let session_ctx = SessionContext::new();
       let schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
           "value",
           DataType::Int64,
           false,
       )]));
   
       // Build a batch with values 0..8191
       let mut builder = Int64Array::builder(8_192);
       for v in 0..8_192 {
           builder.append_value(v);
       }
       let batch = Arc::new(RecordBatch::try_new(
           schema.clone(),
           vec![Arc::new(builder.finish())],
       )?);
   
       // 2a) Wrap this batch in an InfiniteExec
       let infinite = Arc::new(InfiniteExec::new(&batch));
   
       // 2b) Construct a FilterExec that is always false: “value > 10000” (no 
rows pass)
       let false_predicate: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
           Arc::new(Column::new_with_schema("value", &schema)?),
           Gt,
           Arc::new(Literal::new(ScalarValue::Int64(Some(10_000)))),
       ));
       let filtered: Arc<dyn ExecutionPlan> =
           Arc::new(FilterExec::try_new(false_predicate, infinite.clone())?);
   
       // 2c) Use CoalesceBatchesExec to guarantee each Filter pull always 
yields an 8192-row batch
       let coalesced: Arc<dyn ExecutionPlan> =
           Arc::new(CoalesceBatchesExec::new(filtered.clone(), 8_192));
   
       // 2d) Hash-repartition into 1 partition (so that a later global 
aggregation would run on a single partition)
       let exprs: Vec<Arc<dyn PhysicalExpr>> =
           vec![Arc::new(Column::new_with_schema("value", &schema)?)];
       let part = Partitioning::Hash(exprs.clone(), 1);
       let hashed: Arc<dyn ExecutionPlan> =
           Arc::new(RepartitionExec::try_new(coalesced.clone(), part.clone())?);
   
       // 4) WrapLeaves to insert YieldExec—so that the InfiniteExec yields 
control between batches
       let config = ConfigOptions::new();
       let optimized = WrapLeaves::new().optimize(hashed, &config)?;
   
       // 5) Execute with a 1-second timeout. Because Filter discards all 8192 
rows each time
       //    without ever producing output, no batch will arrive within 1 
second. And since
       //    emission type is not Final, we never see an end‐of‐stream marker.
       let mut stream = physical_plan::execute_stream(optimized, 
session_ctx.task_ctx())?;
       const TIMEOUT: u64 = 1;
       let result = select! {
           batch_opt = stream.next() => batch_opt,
           _ = tokio::time::sleep(tokio::time::Duration::from_secs(TIMEOUT)) => 
{
               None
           }
       };
       assert!(
           result.is_none(),
           "Expected no output for infinite + filter(all-false) + aggregate, 
but got a batch"
       );
       Ok(())
   }
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to