pepijnve commented on issue #16353: URL: https://github.com/apache/datafusion/issues/16353#issuecomment-2969344331
> pipeline mode and pipeline breaking I'm starting to realize we might have been placing too much emphasis on this aspect. I've been doing my homework by reading the Volcano paper. I had never read that paper in depth (never has a need to), I just knew that people used the term to kind of refer to 'the iterator approach'. The more I read the more I can see DataFusion is basically a modern day Volcano. One thing DataFusion does not have explicitly, as far as I know, is an exchange operator. I say not explicitly, because the essential demand/data-driven switch part of exchange is present in a couple of operators like Repartition and Coalesce. Perhaps these dataflow change points are a better way of looking at the problem. I really miss having a white board, but here's an approximation :smiling: This is from the Volcano paper (with annotations by me of course). <img width="275" alt="Image" src="https://github.com/user-attachments/assets/76518a8f-f2fc-4def-b206-8a3fe00b3dff" /> Each of the colored blocks is an independently executing sub portion of the query. Translated to Tokio each of these colored blocks is a separate concurrent task. Each of those tasks needs to be cooperatively scheduled to guarantee all of them get a fair share of time to run. As we've concluded earlier the output side of the exchange-like operators is already handling this for us implicitly because they consume tokio task budget. The table sources (Scan in the image) do not. Perhaps this reframing of the problem is the path to a general purpose solution. To verify correct scheduling behavior, you can first subdivide the plan into subplans using the exchange-like operators as cut points. Per sub plan you can then look at all the leave nodes. Each leave node that 'inserts' work into the task needs to consume from the same task-wide tokio budget, not a per operator budget as we're doing today. So what does all this mean in terms of implementation: - Replace the per operator counters with consuming the Tokio task budget. DataFusion is already doing this today so there's precedent for it, and it resolves a bunch of side effects. I've opened a PR in tokio to allow us to use the necessary API for this https://github.com/tokio-rs/tokio/pull/7405. I think we can approximate `poll_proceed` with a combination of 'has budget' and 'consume budget' in the meantime. - Remove the configuration option - Consider renaming YieldStream to CooperativeStream. - I think I would prefer a declarative property on `ExecutionPlan` that communicates if an operator consumes the task budget (not sure what the best description of this would be) instead of `with_cooperative_yielding`. It's not really something you want to opt-in to after all and the exchange-like operators have no way of opting out. The one thing that we still cannot solve automatically then is dynamic query planning. Operators that create streams dynamically still have to make sure they set things up correctly themselves. One possible downside to this approach is that the cooperative scheduling budget is implementation specific to the Tokio runtime. DataFusion becomes more tied to Tokio rather than less. Not sure if that's an issue or not. @alamb @ozankabak wdyt? Maybe this is what you were going for all along and I'm just slowly catching up :smiling: The change of heart comes from the realization that Tokio itself also takes a 'consume at the leaves' strategy and having a task wide budget ensures that tasks cannot silently ignore the yield request. Once one resource depletes the budget, it's no longer possible to make progress anywhere else provided all resource participate in the budgeting system. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org