zhuqi-lucas commented on issue #16353:
URL: https://github.com/apache/datafusion/issues/16353#issuecomment-2963178882
> > But most other operators (Projection, Filter, HashAggregate, HashJoin,
WindowAgg, simple TableScans, etc.) do not use an MPSC channel.
> > That means, we still need to insert explicit yield points
YieldStream/PollBudget to avoid starving the thread.
>
> You're indeed 100% dependent on your child streams which is what makes the
current solution somewhat brittle. If that happens to use a Receiver (or some
other implementation that consumes budget) it will work. If it's some other
stream that does not you may have issues again. Because the sources are user
definable, I think it's wise to take a defensive stance in the implementation
of operators and assume you don't know what they will or will not do. The
current implementation attempts to fix this by ensuring the sources have yield
points. That breaks when streams are swapped dynamically because you no longer
have a way to ensure they contain the necessary yield points. This is a point
the DataFusion library cannot currently intercept. The current implementation
with the non-task wise budge also breaks when an intermediate operator uses
`select!` (or something similar where you read from whatever stream happens to
be ready) since this can obscure the Pending result from a stream.
There's no way to guarantee that Pending bubbles all the way up.
>
> > I believe no major difference for it? Please correct me if i am wrong.
>
> The point of contention was where you put these yield points. Do you
instrument all leave nodes, or do you instrument consumers that may refuse to
yield. To make the system robust I really think you need to do this in the
consumers. It's also beneficial for locality of reasoning. You can look at the
implementation of an operator and assess that it's correct from a cooperative
scheduling point of view without having to look at any other code. The
objection was that there are many, many operators out there in the wild
downstream of DataFusion. That's one that I do not have an answer for. How many
people are building custom pipeline blocking operators?
>
> It's important to note that you would only need to take action in
operators where you can see from the implementation that it may not return
_any_ value, either Ready or Pending, relatively quickly. That's basically
anything that loops over input streams an unbounded number of times.
>
> * Project (or any other simple transformation operator) doesn't need to do
anything since it takes one record batch in and immediately emits another one.
> * Table scans shouldn't either. They'll yield naturally if their input is
not ready, and otherwise they'll return a RecordBatch.
> * Filter in theory should not do anything, the exception being dropping
lots of batches entirely.
> * Joins depends. A build/probe style implementation probably should
consume during build, not during probe. But it depends on the implementation.
> * Aggregation and sorting do need to consume since those can block for an
extended period time.
Thank you, i may got your point, i was thinking optimize the rule, is it a
similar point?
```rust
// traverse all nodes, not just leaves
plan.transform_down(|plan| {
// wrap if leaf OR long-running
if plan.children().is_empty() || is_long_running(plan.as_ref()) {
// use existing cooperative variant if available
let wrapped = plan
.clone()
.with_cooperative_yields()
.unwrap_or_else(||
Arc::new(YieldStreamExec::new(Arc::clone(&plan), yield_period)));
Ok(Transformed::new(wrapped, true, TreeNodeRecursion::Jump))
} else {
Ok(Transformed::no(plan))
}
})
.map(|t| t.data)
```
1. Leaf-only wrapping can be bypassed if someone plugs in a custom Stream or
uses select!‑style combinators.
2. By also wrapping every consumer that does heavy looping—aggregations,
sorts, joins, window funcs—you guarantee that no matter how the streams are
composed, there’s always an explicit YieldStreamExec (or the built‑in
cooperative variant) in the path. (This can be optimized to PollBudget if
possible)
3. We still avoid unnecessary overhead on “simple” operators like Projection
or basic TableScan, because they’re neither leaves with no loops nor in your
“long‑running” list.
Thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]