I'd break things into (at least) four subproblems.

# Nested fork/join Deadlock

The original problem I set out to solve was the problem of nested
fork/joins leading to deadlock.  In particular, the parquet reader
issues a fork/join per column and the dataset scanner issues a
fork/join per file.  The old model blocked the thread on the "join"
stage and the new model uses an asynchronous non-blocking join.
Specifically, the model continues to be that Arrow will only use one
CPU thread per "core" and will never block on CPU threads (as opposed
to I/O thread which may have many/core and can block) and user threads
(threads calling into Arrow) which block (we don't present an
asynchronous API to the user).

Work stealing doesn't really solve this particular problem by itself.
For example, consider the above, if the top level thread spins up one
task per file then those tasks will immediately be stolen (the top
level thread is still spinning up tasks) by all CPU threads which will
then spawn their subtasks (which can't run quite yet because all cores
are busy) and block indefinitely.

> There are some nuances about the
> execution-ordering of tasks within the context of a single CPU core

I don't think it is a good idea to try and solve the deadlock problem
this way.  Also, the deadlock problem is basically solved at this
point.  I don't see any future work for this topic other than to stick
to the above non-blocking model (or adopt a new model if needed).

# More threading utility primitives / threading models

The second issue is the creation of utilities and patterns that make
it easier for developers to create parallel code.  Arrow already had a
fork/join (optional::ParallelFor), executor, thread pool, and basic
future/promise patterns.  Futures have been extended to allow for
continuations.  The AsyncGenerator utilities have been added to make
it easier to deal with asynchronous streams.  In addition, the new
ExecPlan utilities will give us a basic actor model.  Finally, stop
tokens are a significant feature to call out here as well.

There is plenty of future work here for both async generators and exec
plan.  It may be possible that async generator goes away (and is
replaced entirely by exec plan) but that isn't clear to me yet.  There
could be some benefit in aligning on exec plans and reducing the
amount of utilities that need to be maintained but exec plans will
need more work before that is to be done.  For futures there are some
promise'ing (sorry, I had to) issues outstanding:

 * Splitting future into future & promise and introducing the concept
of "abandoned promises" [1]
 * Exposing the asynchronous APIs externally
 * Solving the "future chaining" problem: (i.e. only start a callback
once there is a consumer) [2]

Another interesting investigation may be investigating some of the
newer "truly async" filesystem APIs such as io_uring to see if they
offer any improvements to performance.

This topic is also unrelated to the topic of work stealing.

# Scheduling (coarse grained)

By scheduling I am talking about reordering how tasks get executed for
any number of reasons.  For example, you might use scheduling to make
more efficient use of numa information or to handle workflow
prioritization (e.g. some user calls are "high priority" while others
are "low priority").

I did my work stealing investigation to see if work stealing
scheduling could be used to improve cache locality.  In particular,
when processing a parquet or IPC file, does it make sense to do the
column decode tasks right away while the data has been freshly loaded.
I recorded my results in detail here[3].  The conclusion was that no,
work stealing doesn't really help with cache locality in Arrow today
(on a single-numa node system).  It seems (on my system at least) that
the CPU's prefetching capabilities preempted any advantage I sought to
gain.

Without any ability to show a concrete benefit I found it hard to
justify getting a PR merged and went ahead and let it idle.  However,
if there is interest here, I'd be happy to revive my PRs.  The
first[4] generalizes the thread pool concept to allow for different
task queuing strategies.  The second[5] adds a work stealing
implementation.

# Scheduling (fine grained)

The second potential benefit (and one I was able to measure) to work
stealing is the ability to cut down on the thread pool overhead caused
by locks & sharing.  In other words, the benchmark measured here is
"how many spawns per second" regardless of any real work being done.
This is what was measured (and optimized) in the Tokio case study.
It's also, at the moment, not terribly relevant to Arrow.  You could
potentially argue it is also not a real world problem.  The problem is
that Arrow just doesn't create that many thread tasks.  Work stealing
can take us from being able to spawn ~200k tasks/second to millions of
tasks/second.  However, even the most thread task intensive workload I
could generate for Arrow (async IPC reading with maximum spawning)
only generated about 10k tasks/second.

This particular topic is related to the earlier email regarding batch
sizes[6].  It seems natural to assume that the smaller the batch size
the more potential thread tasks we will have (although, since the
current exec plan is mostly serial, we aren't quite there yet).
Although, I would caution that there are many improvements needed for
small batch sizes and a more efficient scheduler is just one of those.

[1] https://issues.apache.org/jira/browse/ARROW-12207
[2] https://issues.apache.org/jira/browse/ARROW-13004
[3] https://issues.apache.org/jira/browse/ARROW-10117
[4] https://github.com/apache/arrow/pull/10401
[5] https://github.com/apache/arrow/pull/10420
[6] 
https://lists.apache.org/thread.html/ra46c1052622f4f78b8cdf55d1394ef4ba11dc2f7f2821b53998e6c2d%40%3Cdev.arrow.apache.org%3E


On Tue, Aug 3, 2021 at 11:54 AM Wes McKinney <wesmck...@gmail.com> wrote:
>
> hi all,
>
> We've had some discussions in the past about our approach to nested
> parallelism (for example, reading multiple Parquet or CSV files or
> compressed Arrow IPC files in parallel, each of which can benefit from
> internal parallelism for faster parsing / decoding performance). Since
> then, there has been a lot of work on asynchronous IO and futures in
> the C++ project, and Parquet and CSV have refactored to use composable
> futures and an Executor interface has been introduced.
>
> Relatedly, I saw that Weston had worked on a work-stealing thread pool
> implementation [1]. I've read discussions in other projects on the
> nuances around multicore work scheduling, for example in Rust's Tokio
> asynchronous runtime [2].
>
> It seems to me that adopting an executor-queue-per-logical-CPU-core
> and a work-stealing scheduler is a reasonable path forward to solve
> the nested parallelism problem. There are some nuances about the
> execution-ordering of tasks within the context of a single CPU core
> (i.e. child/dependent tasks should be given priority / not be
> preempted by sibling tasks of the parent task).
>
> I'm curious what people think about a high level plan going forward
> and what are the corresponding follow up projects to make sure that
> each place in the codebase where we provide for spawning child tasks
> (e.g. when reading many Parquet or CSV files in parallel) will
> "compose" correctly to yield desirable task-execution-ordering and
> balanced execution across CPU cores.
>
> Thanks
> Wes
>
> [1]: https://github.com/apache/arrow/pull/10420
> [2]: http://tokio.rs/blog/2019-10-scheduler

Reply via email to