robtandy opened a new pull request, #60:
URL: https://github.com/apache/datafusion-ray/pull/60

   @andygrove, per our collaboration around this, here is the requested PR 
main.   Tagging @alamb here also for his additional insight and perspective 
around query execution strategies, and to follow up from the presentation given 
at the DataFusion Community Meeting.
   
   ## TL;DR
   
   This PR contains a pull based stage execution model where stages are 
connected using ArrowFlight.  Its simpler and more performant than previous 
iterations of the streaming rewrite.  It works on TPCH SF100 and below. Above 
has not been tested, though I think it should parallelize well at the expense 
of execution overhead.
   
   ## Evolution of this work
   
   This represents the third iteration of attempts to stream data between 
stages.   A brief accounting of those efforts might be useful to capture here:
   
   1. Try to use the Ray Object Store to stream batches between stages.
   This was a challenge for two reasons.  The first was that under high 
throughput of potentially small items, the object store added too much latency 
to query processing.   The second and reason this was abandoned is that 
creating a shuffle writer exec plan node to jump from rust to python to 
interact with the object store, and potentially itself, call rust, proved 
difficult to manage and reason about.
   
   2. The second attempt and one I have discussed on discord, was to adopt 
ArrowFlight for streaming between stages and flip the execution of the stages 
from pull to push.   The thinking was to have each stage eagerly execute and 
stream batches to an `Exchange` Actor which would hold a bunch of channels (num 
stages x num partitions per stage), and allow subsequent stages to consume from 
them.    
   
       The problems here were that the `Exchange Actor` was difficult to tune 
and created an additional Arrow Flight hop.  Another challenge was that 
DataFusion is inherently a pull based architecture, and very easy to compose 
and reason about.   Flipping this was like swimming upstream and resulted in a 
lot of complications which DataFusion already elegantly manages.
   
       While it was interesting to consider push execution, and may inform 
future work to consume from streams and materialize query results, ultimately, 
it meant reimplementing a lot of things that DataFusion just makes easy.
   
   3. The third attempt, and this iteration is purely pull based and uses 
ArrowFlight to stream between stages.   This turned out to produce the smallest 
amount of code, and one that was easy to work with and debug.  Its as if you 
are executing Datafusion locally, but some of the execution nodes are connected 
with ArrowFlight instead of channels.
   
   There is more that can be improved performance and ergonomics wise, but this 
is quite usable as it is, and will allow others to see and collaborate.  
   
   For examples, see the main readme, and `examples/tips.py` and `tpch/tpc.py`.
   
   ## Execution Strategy
   DataFusion Ray will optimize a query plan and then break it into stages.  
Those stages will be scheduled as Ray Actors and make their partition streams 
available over arrow flight.
   
   Connecting one stage to another means adding a `RayStageReaderExec` node 
within the stage where a connection is required and it will go get the stream 
using a `FlightClient`.
   
   ## Tunables:
    * `---isolate` DataFusion Ray will attempt to host each Stage as its own 
actor.  This flag (in the examples and a parameter to the `RayContext`) will 
tell DataFusion Ray to host _each partition of each stage_ as its own Actor.   
This dramatically increases parallelism, but is a blunt instrument, and a more 
fine tuned choice (like split a stage into x parts) would be more desirable, 
and can be added in a future update.
    * `--concurrency` will control the partitioning count for the all stages 
and is planned using DataFusion before submitting control to Ray.    This 
interacts with `--isolate`
    * `--batch-size` This controls the target (and also max) batch size 
exchanged between stages.  Currently 8192 works for all queries in TPCH SF100.  
 Going higher can produce Flight errors as we exceed the batch payload size.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to