I've been looking at little bit at this in the context of Parquet files

One of the read hot paths in cpp/src/parquet is the function that
reads and decompresses data pages from the stream:

(SerializedPageReader::NextPage)
https://github.com/apache/arrow/blob/master/cpp/src/parquet/column_reader.cc#L143

The control flow goes like this:

* We look forward in the stream until we find a complete Thrift data
page header. This may trigger 0 or more (possibly multiple) Read calls
to the underlying "file" handle. In the default case, the data is all
actually in memory so the reads are zero copy buffer slices. The
reason we don't _always_ do zero copy is that some users want to
reduce the RAM footprint of the decoder (so we don't hold the whole
compressed column chunk in memory all at once)
* We deserialize the data page header
* We perform a Read to read the data page body
* We decompress the data page body
* Control returns to the main decoding loop that materializes values
from each data page into the output buffer

Under the programming models proposed

# Model A (CPU threads signal "idleness", causing a temporary increase
in the number of running tasks)

Pros:
- relatively simple for the developer. Instead of writing

stream_->Peek(allowed_page_size, &buffer);

we write something like

exec_ctx_->WaitIO([&]() { stream_->Peek(allowed_page_size, &buffer); };

The IO-wait signal could also be pushed down into the stream_'s
implementation so in the zero-copy case there is no overhead

Cons
- Not sure the context-switching implications since a hot loop might
cause jumps between CPU cores (I'm really out of my depth here...). It
makes me wonder if we need to look at something optimized for high
performance asynchronous task scheduling:

# Model B (CPU and IO work split into tasks that execute on different
thread queues)

Pros
- Not sure

Cons
- Could cause performance issues if the IO tasks are mostly free (e.g.
due to buffering)

The ideal approach could actually be a hybrid of Models A and B --
there's no particular reason that the programming models cannot
coexist (except that code that uses Model A approach might make code
that has optimized itself for Model B slower).

I think we need to investigate some asynchronous C++ programming libraries like

https://github.com/facebook/folly/tree/master/folly/fibers

to see how organizations with mature C++ practices are handling these
issues from a programming model standpoint

On Mon, Jul 15, 2019 at 3:15 PM Wes McKinney <wesmck...@gmail.com> wrote:
>
> On Mon, Jul 15, 2019 at 12:01 PM Antoine Pitrou <solip...@pitrou.net> wrote:
> >
> > On Mon, 15 Jul 2019 11:49:56 -0500
> > Wes McKinney <wesmck...@gmail.com> wrote:
> > >
> > > For example, suppose we had a thread pool with a limit of 8 concurrent
> > > tasks. Now 4 of them perform IO calls. Hypothetically this should
> > > happen:
> > >
> > > * Thread pool increments a "soft limit" to allow 4 more tasks to
> > > spawn, so at this point technically we have 12 active tasks
> > > * When each IO call returns, the soft limit is decremented
> > > * The soft limit can be constrained to be some multiple of the hard
> > > limit. So if we have a hard limit of 8 CPU-bound threads, then we
> > > might allow an additional 8 tasks to be spawned if a CPU bound thread
> > > indicates that it's waiting for IO
> >
> > Well, there are two approaches to this:
> >
> > * the approach you are proposing
> > * the approach where IO is done in separate worker threads so that we
> >   needn't resize the main thread pool when IO is done
> >
> > Advantages of the second approach:
> >
> > * No need to dynamically resize the main thread pool (which may
> >   difficult to achieve in an efficient manner).
> > * CPU-bound threads can stay pinned on the same HW cores and threads
> >   most of the time, which is probably good for cache locality and to
> >   avoid migration costs.
> >
> > Advantages of the first approach:
> >
> > * The programming model is probably simpler.
> >
> > Also, the first approach is not workable if e.g. TBB doesn't support it
> > (?).
>
> Agreed with both points. I'd like to investigate these approaches to
> see what makes the most sense from a programming model and efficiency
> / performance standpoint.
>
> Currently we have lots of code that looks like (pseudocode)
>
> function Func(State* mutable_state) {
>    CPUTask1(mutable_state);
>    IOTask1(mutable_state);
>    CPUTask2(mutable_state)
>    IOTask2(mutable_state);
>    CPUTask3(mutable_state);
>    ...
> }
>
> Either approach is going to require us to develop a programming model
> where a task scheduler is passed into many functions, so such code has
> to be refactored to push work into the scheduler rather than doing the
> work in the current thread. You could certainly argue that we should
> elect for an API which maximizes our flexibility with regards to
> scheduling work (e.g. having separate thread pools for IO and CPU).
>
> Task scheduling may also need to be aware of IO resource identities to
> control concurrent reads of sources that are sensitive to that (e.g.
> some filesystems may work fine accessed by 16 threads in parallel,
> where others will not).
>
> Probably we need to figure out at least what the programming model
> ought to look like so we can start refactoring old code (e.g.
> parquet-cpp internals) and writing new code in a more
> concurrency-minded way.
>
> >
> > Regards
> >
> > Antoine.
> >
> >

Reply via email to