zhuqi-lucas commented on issue #16353: URL: https://github.com/apache/datafusion/issues/16353#issuecomment-2962673909
> Tokio's cooperative budget is essentially a counter per task that can be decremented at any point in the task. When the counter hits zero you'll return Pending from the function trying to consume the budget. That's basically what YieldStream is doing but with a local counter rather than a task wide one. > > DataFusion's `ReceiverStreamBuilder` makes use of `tokio::sync::mpsc::Receiver`. Whenever `Receiver::recv` is being called, that counter is being decremented, and you'll get a Pending result when the budget is depleted. This is the same thing as what YieldStream is trying to do. > > The benefits I see of trying to leverage the same mechanism elsewhere in DataFusion are: > > * There is only one cooperative yielding mechanism at play. This is easier to reason about than multiple interacting ones. > * There is no need for additional API. DataFusion is already using this in the current released version. > * There are fewer corner cases. Once the budget is depleted, any point in the code checking the budget will yield since all those points are checking the same shared counter. > > The downsides remain: > > * Code that loops may still need to have yield points added to it in order to not yield unnecessarily. > * It's not yet 100% clear to me how you can use this in manually written Futures and Streams. The required bits for that seem to only be crate visible in the current Tokio release. I've raised the question here [Example of using cooperative scheduling budget in manual Future/Stream implementations tokio-rs/tokio#7403](https://github.com/tokio-rs/tokio/issues/7403) > * I have not made a performance analysis of this yet, but since it's used quite extensively already it's likely to be ok. Needs to be evaluated. You’re right—that comment overstates things. In DataFusion, only operators that fan out work into multiple spawned tasks and then re-aggregate via a Tokio MPSC channel actually consume the cooperative budget automatically (because each Receiver::recv().await call decrements it). Examples include: ```rust CoalescePartitionsExec SortPreservingMergeExec ``` All of those use RecordBatchReceiverStreamBuilder::run_input, whose .next().await is really rx.recv().await under the hood—and that is what charges the Tokio coop budget. But most other operators (Projection, Filter, HashAggregate, HashJoin, WindowAgg, simple TableScans, etc.) do not use an MPSC channel. They execute pull-based within a single Stream implementation, and never call recv(), so they don’t automatically consume any cooperative budget. That means, we still need to insert explicit yield points YieldStream/PollBudget to avoid starving the thread. I believe no major difference for it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org