robtandy opened a new pull request, #60: URL: https://github.com/apache/datafusion-ray/pull/60
@andygrove, per our collaboration around this, here is the requested PR main. Tagging @alamb here also for his additional insight and perspective around query execution strategies, and to follow up from the presentation given at the DataFusion Community Meeting. ## TL;DR This PR contains a pull based stage execution model where stages are connected using ArrowFlight. Its simpler and more performant than previous iterations of the streaming rewrite. It works on TPCH SF100 and below. Above has not been tested, though I think it should parallelize well at the expense of execution overhead. ## Evolution of this work This represents the third iteration of attempts to stream data between stages. A brief accounting of those efforts might be useful to capture here: 1. Try to use the Ray Object Store to stream batches between stages. This was a challenge for two reasons. The first was that under high throughput of potentially small items, the object store added too much latency to query processing. The second and reason this was abandoned is that creating a shuffle writer exec plan node to jump from rust to python to interact with the object store, and potentially itself, call rust, proved difficult to manage and reason about. 2. The second attempt and one I have discussed on discord, was to adopt ArrowFlight for streaming between stages and flip the execution of the stages from pull to push. The thinking was to have each stage eagerly execute and stream batches to an `Exchange` Actor which would hold a bunch of channels (num stages x num partitions per stage), and allow subsequent stages to consume from them. The problems here were that the `Exchange Actor` was difficult to tune and created an additional Arrow Flight hop. Another challenge was that DataFusion is inherently a pull based architecture, and very easy to compose and reason about. Flipping this was like swimming upstream and resulted in a lot of complications which DataFusion already elegantly manages. While it was interesting to consider push execution, and may inform future work to consume from streams and materialize query results, ultimately, it meant reimplementing a lot of things that DataFusion just makes easy. 3. The third attempt, and this iteration is purely pull based and uses ArrowFlight to stream between stages. This turned out to produce the smallest amount of code, and one that was easy to work with and debug. Its as if you are executing Datafusion locally, but some of the execution nodes are connected with ArrowFlight instead of channels. There is more that can be improved performance and ergonomics wise, but this is quite usable as it is, and will allow others to see and collaborate. For examples, see the main readme, and `examples/tips.py` and `tpch/tpc.py`. ## Execution Strategy DataFusion Ray will optimize a query plan and then break it into stages. Those stages will be scheduled as Ray Actors and make their partition streams available over arrow flight. Connecting one stage to another means adding a `RayStageReaderExec` node within the stage where a connection is required and it will go get the stream using a `FlightClient`. ## Tunables: * `---isolate` DataFusion Ray will attempt to host each Stage as its own actor. This flag (in the examples and a parameter to the `RayContext`) will tell DataFusion Ray to host _each partition of each stage_ as its own Actor. This dramatically increases parallelism, but is a blunt instrument, and a more fine tuned choice (like split a stage into x parts) would be more desirable, and can be added in a future update. * `--concurrency` will control the partitioning count for the all stages and is planned using DataFusion before submitting control to Ray. This interacts with `--isolate` * `--batch-size` This controls the target (and also max) batch size exchanged between stages. Currently 8192 works for all queries in TPCH SF100. Going higher can produce Flight errors as we exceed the batch payload size. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org