[
https://issues.apache.org/jira/browse/ARROW-14524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17437306#comment-17437306
]
Weston Pace edited comment on ARROW-14524 at 11/2/21, 11:27 AM:
----------------------------------------------------------------
So I did a little bit of thinking through this today and I think there are
merits to both approaches.
1. Pushing the scheduling into the filesystem
This would basically mean a filesystem wrapper with a read method like...
{code:python}
def read_async(self, pos, off):
if self.num_active_readers < self.max_concurrent_readers:
return self.do_read(pos, off)
else
# adds the range to a queue of ranges to read (possibly merging at this
point)
# returns a future that will finish when the item pops off the queue and is
processed
return self.schedule_read(pos, off)
{code}
* + One spot fixes the problem for all formats
* + Ability to coalesce across workflows (I doubt this will ever be needed)
* - Higher latency because first few requests need to "prime the pump" (e.g.
the reads will be immediately issued since there is an idle spot and no
batching will happen)
2. Keeping scheduling in the readers
This means the file readers need a 2-step read where the first step simply
determines the ranges to read:
{code:python}
def read_row_group(self, i):
ranges = self.compute_ranges(i)
self.file.will_need(ranges)
return self.do_read_row_group(i)
{code}
* + Lower latency because readers know which reads are related
* - Requires a 2-pass algorithm, first compute ranges (e.g.
parquet::ParquetFileReader::PreBuffer) then issue reads
At the moment I'm proceeding with the status quo and prototyping adding a
2-step read to the IPC reader (via a
ipc::RecordBatchFileReader::WillNeedBatches method)
was (Author: westonpace):
So I did a little bit of thinking through this today and I think there are
merits to both approaches.
1. Pushing the scheduling into the filesystem
This would basically mean a filesystem wrapper with a read method like...
{code:python}
def read_async(self, pos, off):
if self.num_active_readers < self.max_concurrent_readers:
return self.do_read(pos, off)
else
# adds the range to a queue of ranges to read (possibly merging at this
point)
# returns a future that will finish when the item pops off the queue and is
processed
return self.schedule_read(pos, off)
{code}
* + One spot fixes the problem for all formats
* + Ability to coalesce across workflows (I doubt this will ever be needed)
* - Higher latency because first few requests need to "prime the pump"
2. Keeping scheduling in the readers
This means the file readers need a 2-step read where the first step simply
determines the ranges to read:
{code:python}
def read_row_group(self, i):
ranges = self.compute_ranges(i)
self.file.will_need(ranges)
return self.do_read_row_group(i)
{code}
* + Lower latency because readers know which reads are related
* - Requires a 2-pass algorithm, first compute ranges (e.g.
parquet::ParquetFileReader::PreBuffer) then issue reads
At the moment I'm proceeding with the status quo and prototyping adding a
2-step read to the IPC reader (via a
ipc::RecordBatchFileReader::WillNeedBatches method)
> [C++] Create plugging/coalescing filesystem wrapper
> ---------------------------------------------------
>
> Key: ARROW-14524
> URL: https://issues.apache.org/jira/browse/ARROW-14524
> Project: Apache Arrow
> Issue Type: New Feature
> Components: C++
> Reporter: Weston Pace
> Assignee: Weston Pace
> Priority: Major
>
> We have I/O optimizations scattered across some of our readers. The most
> prominent example is prebuffering in the parquet reader. However, these
> techniques are rather general purpose and will apply in IPC (see ARROW-14229)
> as well as other readers (e.g. Orc, maybe even CSV)
> This filesystem wrapper will not generally be necessary for local filesystems
> as the OS' filesystem schedulers are sufficient. Most of these we can
> accomplish by simply aiming for some configurable degree of parallelism (e.g.
> if there are already X requests in progress then start batching).
> Goals:
> * Batch consecutive small requests into fewer large requests
> * We could plug (configurably) small holes in read ranges as well
> * Potentially split large requests into concurrent small requests
> * Support for the RandomAccessFile::WillNeed command by prefetching ranges
--
This message was sent by Atlassian Jira
(v8.3.4#803005)