jorgecarleitao commented on pull request #8283: URL: https://github.com/apache/arrow/pull/8283#issuecomment-699591199
This is super exciting!!!! Thanks a lot @andygrove for pushing this through! I am trying to understand how this is related to other executing architectures in Rust (e.g. Tokyo) and more broadly to the traits in the crate [`futures`](https://docs.rs/crate/futures/0.3.5). My current hypothesis is that this may be simpler if we leverage some of the existing traits in `futures`. In my current understanding of DataFusion's Execlution plan, `ExecutionPlan` behaves like an `IntoIter` whose Iterator iterates over parts of a partition, via `execute(part_index)`, where `part_index`'s range is given by `output_partitioning().partition_count()` (Longer version [here](https://docs.google.com/document/d/1yREyFSA1Fx1WMC0swUAZBbNlKPmTyebzAiQ8Eu6Ynpg/edit?usp=sharing)). In fact, with [some dull changes](https://github.com/jorgecarleitao/arrow/pull/8) (test [here](https://github.com/jorgecarleitao/arrow/pull/8/files#diff-f92f5cc2c20e4cfba21c282e728d53e4R68-R92)), we can iterate over a full partition as follows: ```rust let plan: &dyn ExecutionPlan = ... plan.into_iter().map(|maybe_part| { let part = maybe_part.unwrap(); // todo handle error via special flatten part.into_iter().map(|maybe_batch| { println!("{:?}", maybe_batch?.num_rows()); Ok(()) }) }) .flatten() .collect::<Result<()>>()?; ``` The problem with this is that it requires each node to be responsible for spawning threads, which, as identified in ARROW-9707, is problematic. To address this, we need to make `execute` something that a scheduler can chew in parts. For that reason, I think that #8285 , that proposes `execute` to be `async`, is beautiful! **But**, If `execute` is `async`, then I think that `ExecutionPlan` could implement [`IntoStream`](https://docs.rs/futures/0.3.5/futures/future/struct.IntoStream.html). In this scenario, `ExecutionPlan` would become > an object that knows know to convert itself into a stream of record batch iterators The advantage is that any scheduler that consumes [`futures::Stream`](https://docs.rs/futures/0.3.5/futures/stream/trait.Stream.html) can pick this stream and execute it. Since it now knows how to stop in the middle of `execute` in case something is blocking it (including any of its `input`s), it can switch tasks ad-hoc. In other words, IMO we can leverage any async library to run this stream. In this scenario, one idea to address ARROW-9707 is: 1. land #8285 2. implement `IntoStream` for `ExecutionPlan` 3. migrate our calls of `thread:spawn` in the different nodes to `.await` 4. pick a scheduler from the shelf and run the stream using it. If we want some customization (e.g. logging, some special naming for the tasks), we can always add a `trait DataFusionStream: Stream<...>` and implement it on top of `Stream`. Note that this would effectively make our `ExecutionPlan` to be a dynamically-typed stream adapter (a-la [futures::stream::Select](https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.9/futures/stream/struct.Select.html), but dynamic): it consumes one or more streams, and it outputs a new stream. Conceptually, IMO this is exactly what an `ExecutionPlan` plan is. In a distributed environment, the main difference would be that the physical planner would plan `ExecutionPlan` whose `execute` includes submitting a job to a worker and wait for the result (e.g. via a TCP channel); the result is still a stream though, and the scheduler can decide to wait for network + node's compute and perform a switch, the same way it can wait for I/O. I am sorry that I only came up with this idea now. I finally understood the implications of #8285. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
