zhuqi-lucas commented on issue #16353:
URL: https://github.com/apache/datafusion/issues/16353#issuecomment-2962673909

   > Tokio's cooperative budget is essentially a counter per task that can be 
decremented at any point in the task. When the counter hits zero you'll return 
Pending from the function trying to consume the budget. That's basically what 
YieldStream is doing but with a local counter rather than a task wide one.
   > 
   > DataFusion's `ReceiverStreamBuilder` makes use of 
`tokio::sync::mpsc::Receiver`. Whenever `Receiver::recv` is being called, that 
counter is being decremented, and you'll get a Pending result when the budget 
is depleted. This is the same thing as what YieldStream is trying to do.
   > 
   > The benefits I see of trying to leverage the same mechanism elsewhere in 
DataFusion are:
   > 
   > * There is only one cooperative yielding mechanism at play. This is easier 
to reason about than multiple interacting ones.
   > * There is no need for additional API. DataFusion is already using this in 
the current released version.
   > * There are fewer corner cases. Once the budget is depleted, any point in 
the code checking the budget will yield since all those points are checking the 
same shared counter.
   > 
   > The downsides remain:
   > 
   > * Code that loops may still need to have yield points added to it in order 
to not yield unnecessarily.
   > * It's not yet 100% clear to me how you can use this in manually written 
Futures and Streams. The required bits for that seem to only be crate visible 
in the current Tokio release. I've raised the question here [Example of using 
cooperative scheduling budget in manual Future/Stream implementations 
tokio-rs/tokio#7403](https://github.com/tokio-rs/tokio/issues/7403)
   > * I have not made a performance analysis of this yet, but since it's used 
quite extensively already it's likely to be ok. Needs to be evaluated.
   
   
   You’re right—that comment overstates things. In DataFusion, only operators 
that fan out work into multiple spawned tasks and then re-aggregate via a Tokio 
MPSC channel actually consume the cooperative budget automatically (because 
each Receiver::recv().await call decrements it). Examples include:
   ```rust
   CoalescePartitionsExec 
   
   SortPreservingMergeExec
   ```
   
   All of those use RecordBatchReceiverStreamBuilder::run_input, whose 
.next().await is really rx.recv().await under the hood—and that is what charges 
the Tokio coop budget.
   
   But most other operators (Projection, Filter, HashAggregate, HashJoin, 
WindowAgg, simple TableScans, etc.) do not use an MPSC channel. They execute 
pull-based within a single Stream implementation, and never call recv(), so 
they don’t automatically consume any cooperative budget.
   
   That means, we still need to insert explicit yield points 
YieldStream/PollBudget to avoid starving the thread. 
   
   I believe no major difference for it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to