pepijnve commented on issue #16353:
URL: https://github.com/apache/datafusion/issues/16353#issuecomment-2969344331

   > pipeline mode and pipeline breaking
   
   I'm starting to realize we might have been placing too much emphasis on this 
aspect. I've been doing my homework by reading the Volcano paper. I had never 
read that paper in depth (never has a need to), I just knew that people used 
the term to kind of refer to 'the iterator approach'. The more I read the more 
I can see DataFusion is basically a modern day Volcano.
   
   One thing DataFusion does not have explicitly, as far as I know, is an 
exchange operator. I say not explicitly, because the essential 
demand/data-driven switch part of exchange is present in a couple of operators 
like Repartition and Coalesce. Perhaps these dataflow change points are a 
better way of looking at the problem.
   
   I really miss having a white board, but here's an approximation :smiling: 
This is from the Volcano paper (with annotations by me of course).
   
   <img width="275" alt="Image" 
src="https://github.com/user-attachments/assets/76518a8f-f2fc-4def-b206-8a3fe00b3dff";
 />
   
   Each of the colored blocks is an independently executing sub portion of the 
query. Translated to Tokio each of these colored blocks is a separate 
concurrent task. Each of those tasks needs to be cooperatively scheduled to 
guarantee all of them get a fair share of time to run.
   
   As we've concluded earlier the output side of the exchange-like operators is 
already handling this for us implicitly because they consume tokio task budget. 
The table sources (Scan in the image) do not.
   
   Perhaps this reframing of the problem is the path to a general purpose 
solution. To verify correct scheduling behavior, you can first subdivide the 
plan into subplans using the exchange-like operators as cut points. Per sub 
plan you can then look at all the leave nodes. Each leave node that 'inserts' 
work into the task needs to consume from the same task-wide tokio budget, not a 
per operator budget as we're doing today.
   
   So what does all this mean in terms of implementation:
   
   - Replace the per operator counters with consuming the Tokio task budget. 
DataFusion is already doing this today so there's precedent for it, and it 
resolves a bunch of side effects. I've opened a PR in tokio to allow us to use 
the necessary API for this https://github.com/tokio-rs/tokio/pull/7405. I think 
we can approximate `poll_proceed` with a combination of 'has budget' and 
'consume budget' in the meantime.
   - Remove the configuration option
   - Consider renaming YieldStream to CooperativeStream.
   - I think I would prefer a declarative property on `ExecutionPlan` that 
communicates if an operator consumes the task budget (not sure what the best 
description of this would be) instead of `with_cooperative_yielding`. It's not 
really something you want to opt-in to after all and the exchange-like 
operators have no way of opting out.
   
   The one thing that we still cannot solve automatically then is dynamic query 
planning. Operators that create streams dynamically still have to make sure 
they set things up correctly themselves.
   
   One possible downside to this approach is that the cooperative scheduling 
budget is implementation specific to the Tokio runtime. DataFusion becomes more 
tied to Tokio rather than less. Not sure if that's an issue or not.
   
   @alamb @ozankabak wdyt? Maybe this is what you were going for all along and 
I'm just slowly catching up :smiling:
   
   The change of heart comes from the realization that Tokio itself also takes 
a 'consume at the leaves' strategy and having a task wide budget ensures that 
tasks cannot silently ignore the yield request. Once one resource depletes the 
budget, it's no longer possible to make progress anywhere else provided all 
resource participate in the budgeting system.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to