tustvold opened a new issue, #2504:
URL: https://github.com/apache/arrow-datafusion/issues/2504

   Following on from #2199 the next piece of the puzzle is how to handle IO in 
the context of the new scheduler, in particular interaction with object 
storage. Much of this work has already been started, but as @alamb rightly 
pointed out, how everything fits together is not fully articulated anywhere. 
This is my attempt to do just that.
   
   As described in #2489, I intend to polish and release the object_store 
abstraction found in 
[IOx](https://github.com/influxdata/influxdb_iox/tree/main/object_store) to 
crates.io. This will in turn allow using it in arrow-rs and DataFusion. Much of 
the rationale for this is covered in #2489 and #2445, but specifically for the 
morsel-driven IO component, moving away from the chunk_reader notions of `Read` 
and `AsyncRead`  is important - as these are don't map well to parquet files in 
object storage (https://github.com/apache/arrow-rs/issues/1473).
   
   The next step will be to integrate `object_store` with `parquet` as part of 
https://github.com/apache/arrow-rs/issues/1605. This will provide an interface 
to stream `RecordBatch` from parquet files located on object store, with 
support for projection-pushdown and row-group filtering. _This will eventually 
integrate with predicate-pushdown 
(https://github.com/apache/arrow-rs/issues/1191), but one step at a time_. 
   
   Other row-oriented formats, e.g. CSV, JSON, etc... will not require custom 
support in arrow-rs, as pushdown cannot be performed using standard object 
store interfaces. The query engine will need to fetch the raw data, potentially 
utilising things like S3 Select, and stream it through the sync arrow-rs 
decoders.
   
   In order to integrate this with the new scheduler an implementation of 
`ObjectStore` will be needed that takes a `tokio::runtime::Handle` and an 
existing `Arc<dyn ObjectStore>`, and spawns the async work on that runtime. 
This can then be used by ParquetExec and friends. 
   
   This is necessary for a few reasons:
   
   * The scheduler uses rayon and not tokio, and many `ObjectStore` will use 
primitives that need a tokio runtime
   * Scheduling CPU-bound work on the same threads as IO is likely to result in 
instability as the CPU-bound work will yield sporadically
   * We want the CPU-bound parquet decoding to occur on the rayon threadpool 
where it can't starve IO tasks
   * We want the IO-bound network fetch to occur on the tokio threadpool where 
it can be efficiently multiplexed
   
   The end result of is a clear separation between IO-bound work, and CPU-bound 
work, in particular:
   
   * Tokio is solely used to multiplex IO-bound work, ensuring stable tail 
latencies
   * Rayon is used to perform synchronous, CPU-bound computations as part of 
the morsel-driven execution described in #2199
   
   Thoughts, concerns, feedback, etc... are most welcome, things are definitely 
not set in stone, but this is my current plan of action. Let me know what you 
think :smile: 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to