We are building a highly concurrent database for security data with Arrow
as data plane (VAST <https://github.com/tenzir/vast>), so I thought I'll
share our view on this since we went over pretty much all of the above
mentioned questions. I'm not trying to say "you should do it this way" but
instead share our journey, in the hope that you can draw some insight from
it.

It sounds like there are several challenges to be solved, ranging from
non-blocking I/O to efficient task-based scheduling - for some notion of
task. We found that the actor model
<https://docs.tenzir.com/vast/architecture/actor-model/> solved all of
these challenges. In particular, we rely on CAF
<https://github.com/actor-framework/actor-framework>, the C++ Actor
Framework as concrete implementation. The basic abstraction is that an
actor, which is effectively a heavy-weight task (100-200 bytes) that can be
scheduled in two ways: on a dedicated thread or in a thread pool. The
thread pool is driven either by a work-stealing or work-sharing scheduler,
based on the deployment environment. Here's how we solve some concrete
problems with these abstractions:

   1. *Asynchronous I/O*: we have one "detached" filesystem actor that
   lives in its own thread that does all I/O. All reads and writes (mmap too,
   but let's take that aside) go through this actor. You'd request a write
   with a contiguous chunk of data, and get a response back when the operation
   succeeded. Any actor can interact with the filesystem actor, also actors
   that are scheduled in the pool. The point of this abstraction is that I/O
   operations can block, which is why you never want to execute them in the
   thread pool. Otherwise all other tasks/actors that are scheduled right
   behind the I/O operation might get stalled. Sure, work-stealing will
   alleviate this, but not when steals occur frequently.
   2. *Concurrent task execution*: if the work can be *overdecomposed* into
   smaller chunks such that there are more chunks than CPU cores, a thread
   pool plus scheduler will do a good job at exploiting the available system
   concurrency. In our use case, we build dozens of indexes in parallel, with
   one actor being responsible for one column. They all run in parallel and
   independent of each other, but operate on the same (immutable) data. So
   there are no data races by design. Once an indexer actor completes a set of
   record batches, it builds a flatbuffer and ships it to the I/O actor to
   persist it. At that point we're in point (1), asynchronous I/O. This plays
   together nicely.
   3. *Network transparency*: Since the actor model communication
   abstraction is message passing, it's easy to hide the actor location, be it
   in memory or remotely available via IPC. The actor runtime takes care of
   either just passing a pointer with the message contents or doing the
   transparent serialization. This makes it very easy to build a distributed
   system if need be, or run everything in a single process.

We could have gone with lower-level abstraction, e.g., thread pool
with coroutines, but decided that we get more mileage from an actor
runtime. We see coroutines just as the syntactic sugar that make the
inversion of control more reasonable to understand through straight-line
code, but it's not a new messaging capability in the context of the actor
model implementation we use, which allows for arbitrary messaging and
synchronization patterns - all fully asynchronous, i.e., non-blocking by
yielding to the scheduler. Calling .then(...) on when a message arrives is
effectively a future, and we frequently create response promises and
message delegation patterns, often implicitly through the runtime simply by
returning a value in a lambda. We also challenged the overhead of an actor
compared to a light-weight task, but found that even creating millions of
actors in parallel in CAF still doesn't cause memory pressure or
substantially more cache misses. At the end, we could not find a reason to
*not* go with CAF, and we don't regret this choice to date. To date, we
work with the experimental credit-based streaming feature of CAF that gives
Flink-like streaming semantics though actor-based backpressure. Once again,
a single powerful abstraction to pretty much address all our needs in
scalable distributed systems that is close to the hardware as well.

I hope this helps making better decisions in finding the right abstractions
for you. I've used the actor model as a vehicle for my arguments, but there
are other isomorphic models and vocabulary (e.g., CSP). The main point I
want to get across is that thinking too low-level and single-solution (only
threapools, just coroutines, etc.) may result in a local instead of global
optimum.

    Matthias


On Mon, Sep 21, 2020 at 9:38 PM Ben Kietzman <b...@ursacomputing.com> wrote:

> FWIW boost.coroutine and boost.asio provide composable coroutines,
> non blocking IO, and configurable scheduling for CPU work out of the box.
>
> The boost libraries are not lightweight but they are robust and
> cross platform, so I think asio is worth consideration.
>
> On Sat, Sep 19, 2020 at 8:22 PM Wes McKinney <wesmck...@gmail.com> wrote:
>
> > I took a look at https://github.com/kpamnany/partr and Julia's
> > production iteration of that -- kpamnany/partr depends on
> > libconcurrent's coroutine implementation which does not work on
> > Windows. It appears that Julia is using libuv instead. If we're
> > looking for a lighter-weight C coroutine implementation, there is
> > http://software.schmorp.de/pkg/libcoro.html, but either way there is
> > quite a bit of systems work to create something that can work for
> > Arrow.
> >
> > I don't have an intuition whether depth-first scheduling (what Julia
> > is doing) or breadth-first scheduling (aka "work stealing" -- which is
> > what Intel's TBB library does [1]) will work better for our use cases.
> > But I believe that we need to figure out a programming model (probably
> > based on composable futures and continuations given what we are
> > already doing) that hides the details of which coroutine/threading
> > runtime.
> >
> > A follow-on project would likely be to define a non-blocking API for
> > our various IO interfaces that composes with the rest of the thread
> > scheduling machinery.
> >
> > Either way, this problem is definitely non-trivial so we should figure
> > out what "default" approach we can implement that is compatible with
> > our "minimal dependency core build" approach in C++ (which may involve
> > vendoring some third party code, but not sure if vendoring TBB is a
> > good idea) and go and do that. If anyone would like to be funded to
> > work on this problem, please get in touch with me offline.
> >
> > Thanks
> > Wes
> >
> > [1]:
> >
> https://software.intel.com/content/www/us/en/develop/blogs/the-work-isolation-functionality-in-intel-threading-building-blocks-intel-tbb.html
> >
> > On Sat, Sep 19, 2020 at 5:21 PM Weston Pace <weston.p...@gmail.com>
> wrote:
> > >
> > > Ok, my skill with C++ got in the way of my ability to put something
> > > together.  First, I did not realize that C++ futures were a little
> > > different than the definition I'm used to for futures.  By default,
> > > C++ futures are not composable, you can't add continuations with
> > > `then`, `when_all` or `when_any`.  There is an extension for this (not
> > > sure if it will make it even in C++20) and there are continuations for
> > > futures in boost's futures.  However, since arrow is currently using
> > > its own future implementation I could not use either of these
> > > libraries.  I spent a bit trying to add continuations to arrow's
> > > future implementation but my lack of skill with C++ got in the way.  I
> > > want to keep working on it but it may be a few days.  In the meantime
> > > I will try and type up something more complete (with a few diagrams)
> > > to explain what I'm intending.
> > >
> > > Having looked at the code for a while I do have a better sense of what
> > > is involved.  I think it would be a pretty extensive set of changes.
> > > Also, it looks like C++20 is planning on adopting co-routines which
> > > they will be using for sequential async.  So perhaps it makes more
> > > sense to go directly to coroutines instead of moving to composable
> > > futures and then later to coroutines at some point in the future.
> > >
> > > Also, re: Julia, I looked into it a bit further and Julia is using
> > > libuv under the hood for all file I/O (which is non-blocking I/O).
> > > Also async/await are built into the bones of Julia.  As far as I can
> > > tell from my brief examination is that there is no way to have a Julia
> > > task that is performing blocking I/O (in the sense that a "thread pool
> > > thread" is blocked on I/O.  You can have blocking I/O in the
> > > async/await sense where you are awaiting on I/O to maintain sequential
> > > semantics.
> > >
> > > On Wed, Sep 16, 2020 at 8:10 AM Weston Pace <weston.p...@gmail.com>
> > wrote:
> > > >
> > > > If you want to specifically look at the problem of dataset scanning,
> > > > file scanning, and nested parallelism then probably the lowest effort
> > > > improvement would be to eliminate the whole idea of "scan threads".
> > > > You currently have...
> > > >
> > > >     for (size_t i = 0; i < readers.size(); ++i) {
> > > >         ARROW_ASSIGN_OR_RAISE(futures[i],
> pool->Submit(ReadColumnFunc,
> > i));
> > > >     }
> > > >     Status final_status;
> > > >     for (auto& fut : futures) {
> > > >         final_status &= fut.status();
> > > >     }
> > > >     // Hiding some follow-up aggregation and the next line is a bit
> > abbreviated
> > > >     return Validate();
> > > >
> > > > You're already using futures so it would be pretty straightforward to
> > > > change that to
> > > >
> > > >     for (size_t i = 0; i < readers.size(); ++i) {
> > > >         ARROW_ASSIGN_OR_RAISE(futures[i],
> pool->Submit(ReadColumnFunc,
> > i));
> > > >     }
> > > >     // Hiding some follow-up aggregation and the next line is a bit
> > abbreviated
> > > >     return
> >
> std::experimental::when_all(futures).then(FollowUpAggregation).then(Validate);
> > > >
> > > > Dataset scans are currently using a threaded task group.  Those would
> > > > change to std::experimental::when_all instead.  So now the dataset
> > > > scan is not creating N threads but again just returning a composed
> > > > future.  So if you have one dataset scan across 4 files and each file
> > > > kicks off 10 column reader tasks then you have 40 "threads" submitted
> > > > to your thread pool and the main calling thread waiting on the
> future.
> > > > All of these thread pool threads are inner worker threads.  None of
> > > > these thread pool threads have to wait on other threads.  There is no
> > > > possibility of deadlock.
> > > >
> > > > You can do this at each level of nesting so that only your inner most
> > > > worker threads are actually calling `pool->Submit`.  There is then
> > > > just one outer main thread (presumably not a thread pool thread) that
> > > > is waiting on the future.  It's not a super small change because now
> > > > FileReaderImpl::ReadRowGroups returns a future.  That would have to
> > > > propagate all the way up so that your dataset scan itself is
> returning
> > > > a future (you can safely synchronize it at this point so your public
> > > > API remains synchronous because no public API call is going to be
> > > > arriving on a thread pool thread).
> > > >
> > > > That at least solves the deadlock problem.  It also starts to
> > > > propagate futures throughout the code base which could be good or bad
> > > > depending on your view of such things.  It does not solve the
> > > > under-utilization problem because you still have threads sitting in
> > > > the thread pool waiting on blocking I/O.
> > > >
> > > > The next step would be to move to non-blocking I/O.  At this point
> you
> > > > have quite a few choices.
> > > >
> > > > On Wed, Sep 16, 2020 at 7:26 AM Wes McKinney <wesmck...@gmail.com>
> > wrote:
> > > > >
> > > > > On Wed, Sep 16, 2020 at 10:31 AM Jorge Cardoso Leitão
> > > > > <jorgecarlei...@gmail.com> wrote:
> > > > > >
> > > > > > Hi,
> > > > > >
> > > > > > I am not sure I fully understand, so I will try to give an
> example
> > to
> > > > > > check: we have a simple query that we want to write the result to
> > some
> > > > > > place:
> > > > > >
> > > > > > SELECT t1.b * t2.b FROM t1 JOIN ON t2 WHERE t1.a = t2.a
> > > > > >
> > > > > > At the physical plane, we need to
> > > > > >
> > > > > > 1. read each file in batches
> > > > > > 2. join the batches
> > > > > > 3. iterate over results and write them in partitions
> > > > > >
> > > > > > In principle, we can multi-thread them
> > > > > >
> > > > > > 1. multi-threaded scan
> > > > > > 2. multi-threaded hash join (e.g. with a shared map)
> > > > > > 3. multi-threaded write (e.g. 1 file per partition)
> > > > > >
> > > > > > The issue is that when we schedule this, the physical nodes
> > themselves
> > > > > > control how they perform their own operations, and there is no
> > > > > > orchestration as to what resources are available and what should
> be
> > > > > > prioritized. Consequently, we may have a scan of table t1 that is
> > running
> > > > > > with 12 threads, while the scan of table t2 is waiting for a
> > thread to be
> > > > > > available. This causes the computation to stall as both are
> > required for
> > > > > > step 2 to proceed. OTOH, if we have no multithreaded scans, then
> > > > > > multithreading seldom helps, as we are bottlenecked by the scans'
> > > > > > throughput. Is this the gist of the problem?
> > > > > >
> > > > > > If yes: the core issue here seems to be that there is no
> > orchestrator to
> > > > > > re-prioritize CPU to where it is needed (the scan of t2 in the
> > example
> > > > > > above), because each physical node has a thread.join that is not
> > > > > > coordinated with their downstream dependencies (and so on). Isn't
> > this a
> > > > > > natural candidate for futures/async? We seem to need some
> > coordination
> > > > > > across the DAG.
> > > > > >
> > > > > > If not: could someone offer an example describing how the
> > multi-threaded
> > > > > > scan can cause a deadlock?
> > > > >
> > > > > Suppose that we have 4 large CSV files in Amazon S3 and a static
> > > > > thread pool with 4 threads. If we use the thread pool to execute
> scan
> > > > > tasks for all 4 files in parallel, then if any of those scan tasks
> > > > > internally try to spawn tasks in the same thread pool (before other
> > > > > tasks have finished) to parallelize some of their computational
> work
> > > > > -- i.e. "nested parallelism" is what we call this -- then you have
> a
> > > > > deadlock because our current thread pool implementation cannot
> > > > > distinguish between task interdependencies / does not understand
> > > > > nested parallelism.
> > > > >
> > > > > > Best,
> > > > > > Jorge
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Wed, Sep 16, 2020 at 4:16 PM Wes McKinney <
> wesmck...@gmail.com>
> > wrote:
> > > > > >
> > > > > > > hi Jacob,
> > > > > > >
> > > > > > > The approach taken in Julia strikes me as being motivated by
> the
> > same
> > > > > > > problems that we have in this project. It would be interesting
> if
> > > > > > > partr could be used as the basis of our nested parallelism
> > runtime.
> > > > > > > How does Julia handle IO calls within spawned tasks? In other
> > words,
> > > > > > > if we have a function like:
> > > > > > >
> > > > > > > void MyTask() {
> > > > > > >   DoCPUWork();
> > > > > > >   DoSomeIO();
> > > > > > >   DoMoreCPUWork();
> > > > > > >   DoAdditionalIO();
> > > > > > > }
> > > > > > >
> > > > > > > (or maybe you just aren't supposed to do that)
> > > > > > >
> > > > > > > The biggest question would be the C++ programming model (in
> other
> > > > > > > words, how we have to change our approach to writing code) that
> > we use
> > > > > > > throughout the Arrow libraries. What I'm getting at is to
> figure
> > out
> > > > > > > how to minimize the amount of code that needs to be
> significantly
> > > > > > > altered to fit in with the new approach to work scheduling. For
> > > > > > > example, it doesn't strike me that the API that we are using to
> > > > > > > parallelize reading Parquet files at the column level is going
> > to work
> > > > > > > because there are various IO calls within the tasks that are
> > being
> > > > > > > submitted to the thread pool
> > > > > > >
> > > > > > >
> > > > > > >
> >
> https://github.com/apache/arrow/blob/apache-arrow-1.0.1/cpp/src/parquet/arrow/reader.cc#L859-L875
> > > > > > >
> > > > > > > - Wes
> > > > > > >
> > > > > > > On Wed, Sep 16, 2020 at 1:37 AM Jacob Quinn <
> > quinn.jac...@gmail.com>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > My immediate thought reading the discussion points was
> Julia's
> > task-based
> > > > > > > > multithreading model that has been part of the language for
> > over a year
> > > > > > > > now. An announcement blogpost for Julia 1.3 laid out some of
> > the details
> > > > > > > > and high-level approach:
> > > > > > > https://julialang.org/blog/2019/07/multithreading/,
> > > > > > > > and the multithreading code was marked stable in the recent
> > 1.5 release.
> > > > > > > >
> > > > > > > > Kiran, one of the main contributors to the threading model in
> > Julia,
> > > > > > > worked
> > > > > > > > on a separate C-based repo for the core functionality (
> > > > > > > > https://github.com/kpamnany/partr), but I think the latest
> > code is
> > > > > > > embedded
> > > > > > > > in the Julia source code now.
> > > > > > > >
> > > > > > > > Anyway, probably most useful as a reference, but Jameson
> > (cc'd) also does
> > > > > > > > weekly multithreading chats (on Wednesdays), so I imagine he
> > wouldn't
> > > > > > > mind
> > > > > > > > chatting about things if desired.
> > > > > > > >
> > > > > > > > -Jacob
> > > > > > > >
> > > > > > > > On Tue, Sep 15, 2020 at 8:17 PM Weston Pace <
> > weston.p...@gmail.com>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > My C++ is pretty rusty but I'll see if I can come up with a
> > concrete
> > > > > > > > > CSV example / experiment / proof of concept on Friday when
> I
> > have a
> > > > > > > > > break from work.
> > > > > > > > >
> > > > > > > > > On Tue, Sep 15, 2020 at 3:47 PM Wes McKinney <
> > wesmck...@gmail.com>
> > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > On Tue, Sep 15, 2020 at 7:54 PM Weston Pace <
> > weston.p...@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > Yes.  Thank you.  I am in agreement with you and
> > futures/callbacks
> > > > > > > are
> > > > > > > > > > > one such "richer programming model for
> > > > > > > > > > > hierarchical work scheduling".
> > > > > > > > > > >
> > > > > > > > > > > A scan task with a naive approach is:
> > > > > > > > > > >
> > > > > > > > > > >     workers = partition_files_list(files_list)
> > > > > > > > > > >     for worker in workers:
> > > > > > > > > > >         start_thread(worker)
> > > > > > > > > > >     for worker in workers:
> > > > > > > > > > >         join_thread(worker)
> > > > > > > > > > >     return aggregate_results()
> > > > > > > > > > >
> > > > > > > > > > > You have N+1 threads because you have N worker threads
> > and 1 scan
> > > > > > > > > > > thread.  There is the potential for deadlock if your
> > thread pool
> > > > > > > only
> > > > > > > > > > > has one remaining spot and it is given to the scan
> > thread.
> > > > > > > > > > >
> > > > > > > > > > > On the other hand, with a futures based approach you
> > have:
> > > > > > > > > > >
> > > > > > > > > > > futures = partition_files_list(files_list)
> > > > > > > > > > > return when_all(futures).do(aggregate_results)
> > > > > > > > > > >
> > > > > > > > > > > There are only N threads.  The scan thread goes away.
> > In fact, if
> > > > > > > all
> > > > > > > > > > > of your underlying OS/FS libraries are non-blocking
> then
> > you can
> > > > > > > > > > > completely eliminate threads in the waiting state and
> an
> > entire
> > > > > > > > > > > category of deadlocks are no longer a possibility.
> > > > > > > > > >
> > > > > > > > > > I don't quite follow. I think it would be most helpful to
> > focus on a
> > > > > > > > > > concrete practical matter like reading Parquet or CSV
> > files in
> > > > > > > > > > parallel (which can be go faster through parallelism at
> > the single
> > > > > > > > > > file level) and devise a programming model in C++ that is
> > different
> > > > > > > > > > from what we are currently doing that results in superior
> > CPU
> > > > > > > > > > utilization.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > -Weston
> > > > > > > > > > >
> > > > > > > > > > > On Tue, Sep 15, 2020 at 1:21 PM Wes McKinney <
> > wesmck...@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > hi Weston,
> > > > > > > > > > > >
> > > > > > > > > > > > We've discussed some of these problems in the past --
> > I was
> > > > > > > > > > > > enumerating some of these issues to highlight the
> > problems that
> > > > > > > are
> > > > > > > > > > > > resulting from an absence of a richer programming
> > model for
> > > > > > > > > > > > hierarchical work scheduling. Parallel tasks
> > originating in each
> > > > > > > > > > > > workload are submitted to a global thread pool where
> > they are
> > > > > > > > > > > > commingled with the tasks coming from other
> workloads.
> > > > > > > > > > > >
> > > > > > > > > > > > As an example of how this can go wrong, suppose we
> > have a static
> > > > > > > > > > > > thread pool with 4 executors. If we submit 4
> > long-running tasks
> > > > > > > to
> > > > > > > > > the
> > > > > > > > > > > > pool, and then each of these tasks spawn additional
> > tasks that go
> > > > > > > > > into
> > > > > > > > > > > > the thread pool, a deadlock can occur, because the
> > thread pool
> > > > > > > thinks
> > > > > > > > > > > > that it's executing tasks when in fact those tasks
> are
> > waiting on
> > > > > > > > > > > > their dependent tasks to complete.
> > > > > > > > > > > >
> > > > > > > > > > > > A similar resource underutilization occurs when we do
> > > > > > > > > > > > pool->Submit(ReadFile), where ReadFile needs to do
> > some IO --
> > > > > > > from
> > > > > > > > > the
> > > > > > > > > > > > thread pool's perspective, the task is "working" even
> > though it
> > > > > > > may
> > > > > > > > > > > > wait for one or more IO calls to complete.
> > > > > > > > > > > >
> > > > > > > > > > > > In the Datasets API in C++ we have both of these
> > problems: file
> > > > > > > scan
> > > > > > > > > > > > tasks are being pushed onto the global thread pool,
> > and so to
> > > > > > > prevent
> > > > > > > > > > > > deadlocks multithreaded file parsing has been
> disabled.
> > > > > > > Additionally,
> > > > > > > > > > > > the scan tasks do IO, resulting in suboptimal
> > performance (the
> > > > > > > > > > > > problems caused by this will be especially
> exacerbated
> > when
> > > > > > > running
> > > > > > > > > > > > against slower filesystems like Amazon S3)
> > > > > > > > > > > >
> > > > > > > > > > > > Hopefully the issues are more clear.
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks
> > > > > > > > > > > > Wes
> > > > > > > > > > > >
> > > > > > > > > > > > On Tue, Sep 15, 2020 at 2:57 PM Weston Pace <
> > > > > > > weston.p...@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > It sounds like you are describing two problems.
> > > > > > > > > > > > >
> > > > > > > > > > > > > 1) Idleness - Tasks are holding threads in the
> > thread pool
> > > > > > > while
> > > > > > > > > they
> > > > > > > > > > > > > wait for IO or some long running non-CPU task to
> > complete.
> > > > > > > These
> > > > > > > > > > > > > threads are often in a "wait" state or something
> > similar.
> > > > > > > > > > > > > 2) Fairness - The ordering of tasks is causing
> short
> > tasks that
> > > > > > > > > could
> > > > > > > > > > > > > be completed quickly from being stuck behind longer
> > term tasks.
> > > > > > > > > > > > > Fairness can be an issue even if all tasks are
> > always in the
> > > > > > > active
> > > > > > > > > > > > > state consuming CPU time.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Are both of these issues a problem?  Are you
> looking
> > to address
> > > > > > > > > both of them?
> > > > > > > > > > > > >
> > > > > > > > > > > > > I doubt it's much help as it is probably a more
> > substantial
> > > > > > > change
> > > > > > > > > > > > > than what you were looking for but the popular
> > solution to #1
> > > > > > > these
> > > > > > > > > > > > > days seems to be moving toward non blocking IO with
> > > > > > > > > > > > > promises/callbacks/async.  That way threads are
> > never in the
> > > > > > > > > waiting
> > > > > > > > > > > > > state (unless sitting idle in the pool).
> > > > > > > > > > > > >
> > > > > > > > > > > > > -Weston
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Tue, Sep 15, 2020 at 7:00 AM Wes McKinney <
> > > > > > > wesmck...@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > In light of ARROW-9924, I wanted to rekindle the
> > discussion
> > > > > > > > > about our
> > > > > > > > > > > > > > approach to multithreading (especially the
> > _programming
> > > > > > > model_)
> > > > > > > > > in
> > > > > > > > > > > > > > C++. We had some discussions about this about 6
> > months ago
> > > > > > > and
> > > > > > > > > there
> > > > > > > > > > > > > > were more discussions as I recall in summer 2019.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Realistically, we are going to be consistently
> > dealing with
> > > > > > > > > > > > > > independent concurrent in-process workloads that
> > each
> > > > > > > > > respectively can
> > > > > > > > > > > > > > go faster by multithreading. These could be
> things
> > like:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > * Reading file formats (CSV, Parquet, etc.) that
> > benefit from
> > > > > > > > > > > > > > multithreaded parsing/decoding
> > > > > > > > > > > > > > * Reading one or more files in parallel using the
> > Datasets
> > > > > > > API
> > > > > > > > > > > > > > * Executing any number of multithreaded
> analytical
> > workloads
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > One obvious issue with our thread scheduling is
> > the FIFO
> > > > > > > nature
> > > > > > > > > of the
> > > > > > > > > > > > > > global thread pool. If a new independent
> > multithreaded
> > > > > > > workload
> > > > > > > > > shows
> > > > > > > > > > > > > > up, it has to wait for other workloads to
> complete
> > before the
> > > > > > > > > new work
> > > > > > > > > > > > > > will be scheduled. Think about a Flight server
> > serving
> > > > > > > queries to
> > > > > > > > > > > > > > users -- is it fair for one query to "hog" the
> > thread pool
> > > > > > > and
> > > > > > > > > force
> > > > > > > > > > > > > > other requests to wait until they can get access
> > to some CPU
> > > > > > > > > > > > > > resources? You could imagine a workload that
> > spawns 10
> > > > > > > minutes
> > > > > > > > > worth
> > > > > > > > > > > > > > of CPU work, where a new workload has to wait for
> > all of that
> > > > > > > > > work to
> > > > > > > > > > > > > > complete before having any tasks scheduled for
> > execution.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > The approach that's been taken in the Datasets
> API
> > to avoid
> > > > > > > > > problems
> > > > > > > > > > > > > > with nested parallelism (file-specific operations
> > spawning
> > > > > > > > > multiple
> > > > > > > > > > > > > > tasks onto the global thread pool) is simply to
> > disable
> > > > > > > > > multithreading
> > > > > > > > > > > > > > at the level of a single file. This is clearly
> > suboptimal.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > We have additional problems in that some
> > file-loading related
> > > > > > > > > tasks do
> > > > > > > > > > > > > > a mixture of CPU work and IO work, and once a
> > thread has been
> > > > > > > > > > > > > > dispatched to execute one of these tasks, when IO
> > takes
> > > > > > > place, a
> > > > > > > > > CPU
> > > > > > > > > > > > > > core may sit underutilized while the IO is
> waiting.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > There's more aspects we can discuss, but in
> > general I think
> > > > > > > we
> > > > > > > > > need to
> > > > > > > > > > > > > > come up with a programming model for building our
> > C++ system
> > > > > > > > > > > > > > components with the following requirements:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > * Deadlocks not possible by design
> > > > > > > > > > > > > > * Any component can safely use "nested
> > parallelism" without
> > > > > > > the
> > > > > > > > > > > > > > programmer having to worry about deadlocks or one
> > task
> > > > > > > "hogging"
> > > > > > > > > the
> > > > > > > > > > > > > > thread pool. So in other words, if there's only a
> > single
> > > > > > > > > > > > > > multithreading-capable workload running, we "let
> > it rip"
> > > > > > > > > > > > > > * Resources can be reasonably fairly allocated
> > amongst
> > > > > > > concurrent
> > > > > > > > > > > > > > workloads (think: independent requests coming in
> > through
> > > > > > > Flight,
> > > > > > > > > or
> > > > > > > > > > > > > > scan tasks on different Parquet files in the
> > Datasets API).
> > > > > > > Limit
> > > > > > > > > > > > > > scenarios where a new workload is blocked
> > altogether on the
> > > > > > > > > completion
> > > > > > > > > > > > > > of other workloads
> > > > > > > > > > > > > > * A well-defined programming pattern for tasks
> > that do a
> > > > > > > mixture
> > > > > > > > > of
> > > > > > > > > > > > > > CPU work and IO work that allows CPU cores to be
> > used when a
> > > > > > > > > task is
> > > > > > > > > > > > > > waiting on IO
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > We can't be the only project that has these
> > problems, so I'm
> > > > > > > > > > > > > > interested to see what solutions have been
> > successfully
> > > > > > > employed
> > > > > > > > > by
> > > > > > > > > > > > > > others. For example, it strikes me as similar to
> > concurrency
> > > > > > > > > issues
> > > > > > > > > > > > > > inside an analytic database. How are they
> > preventing
> > > > > > > concurrent
> > > > > > > > > > > > > > workload starvation problems or handling CPU/IO
> > task
> > > > > > > scheduling
> > > > > > > > > to
> > > > > > > > > > > > > > avoid CPU underutilization?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Choices of which threading libraries we might use
> > to
> > > > > > > implement a
> > > > > > > > > > > > > > viable solution (e.g. TBB) seem secondary to the
> > programming
> > > > > > > > > model
> > > > > > > > > > > > > > that we use to implement our components.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > Wes
> > > > > > > > >
> > > > > > >
> >
>

Reply via email to