jorgecarleitao commented on pull request #8283:
URL: https://github.com/apache/arrow/pull/8283#issuecomment-699591199


   This is super exciting!!!! Thanks a lot @andygrove for pushing this through!
   
   I am trying to understand how this is related to other executing 
architectures in Rust (e.g. Tokyo) and more broadly to the traits in the crate 
[`futures`](https://docs.rs/crate/futures/0.3.5).
   
   My current hypothesis is that this may be simpler if we leverage some of the 
existing traits in `futures`.
   
   In my current understanding of DataFusion's Execlution plan, `ExecutionPlan` 
behaves like an `IntoIter` whose Iterator iterates over parts of a partition, 
via `execute(part_index)`, where `part_index`'s range is given by 
`output_partitioning().partition_count()` (Longer version 
[here](https://docs.google.com/document/d/1yREyFSA1Fx1WMC0swUAZBbNlKPmTyebzAiQ8Eu6Ynpg/edit?usp=sharing)).
   
   In fact, with [some dull 
changes](https://github.com/jorgecarleitao/arrow/pull/8) (test 
[here](https://github.com/jorgecarleitao/arrow/pull/8/files#diff-f92f5cc2c20e4cfba21c282e728d53e4R68-R92)),
 we can iterate over a full partition as follows:
   
   ```rust
   let plan: &dyn ExecutionPlan = ...
   
   plan.into_iter().map(|maybe_part| {
       let part = maybe_part.unwrap();  // todo handle error via special flatten
       part.into_iter().map(|maybe_batch| {
           println!("{:?}", maybe_batch?.num_rows());
           Ok(())
       })
   })
   .flatten()
   .collect::<Result<()>>()?;
   ```
   
   The problem with this is that it requires each node to be responsible for 
spawning threads, which, as identified in ARROW-9707, is problematic.
   
   To address this, we need to make `execute` something that a scheduler can 
chew in parts. For that reason, I think that #8285 , that proposes `execute` to 
be `async`, is beautiful!
   
   **But**, If `execute` is `async`, then I think that `ExecutionPlan` could 
implement 
[`IntoStream`](https://docs.rs/futures/0.3.5/futures/future/struct.IntoStream.html).
 In this scenario, `ExecutionPlan` would become
   
   >  an object that knows know to convert itself into a stream of record batch 
iterators
   
   The advantage is that any scheduler that consumes 
[`futures::Stream`](https://docs.rs/futures/0.3.5/futures/stream/trait.Stream.html)
 can pick this stream and execute it. Since it now knows how to stop in the 
middle of `execute` in case something is blocking it (including any of its 
`input`s), it can switch tasks ad-hoc. In other words, IMO we can leverage any 
async library to run this stream.
   
   In this scenario, one idea to address ARROW-9707 is:
   
   1. land #8285
   2. implement `IntoStream` for `ExecutionPlan`
   3. migrate our calls of `thread:spawn`  in the different nodes to `.await`
   4. pick a scheduler from the shelf and run the stream using it.
   
   If we want some customization (e.g. logging, some special naming for the 
tasks), we can always add a `trait DataFusionStream: Stream<...>` and implement 
it on top of `Stream`.
   
   Note that this would effectively make our `ExecutionPlan` to be a 
dynamically-typed stream adapter (a-la 
[futures::stream::Select](https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.9/futures/stream/struct.Select.html),
 but dynamic): it consumes one or more streams, and it outputs a new stream. 
Conceptually, IMO this is exactly what an `ExecutionPlan` plan is.
   
   In a distributed environment, the main difference would be that the physical 
planner would plan `ExecutionPlan` whose `execute` includes submitting a job to 
a worker and wait for the result (e.g. via a TCP channel); the result is still 
a stream though, and the scheduler can decide to wait for network + node's 
compute and perform a switch, the same way it can wait for I/O.
   
   I am sorry that I only came up with this idea now. I finally understood the 
implications of #8285.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to