I'd break things into (at least) four subproblems. # Nested fork/join Deadlock
The original problem I set out to solve was the problem of nested fork/joins leading to deadlock. In particular, the parquet reader issues a fork/join per column and the dataset scanner issues a fork/join per file. The old model blocked the thread on the "join" stage and the new model uses an asynchronous non-blocking join. Specifically, the model continues to be that Arrow will only use one CPU thread per "core" and will never block on CPU threads (as opposed to I/O thread which may have many/core and can block) and user threads (threads calling into Arrow) which block (we don't present an asynchronous API to the user). Work stealing doesn't really solve this particular problem by itself. For example, consider the above, if the top level thread spins up one task per file then those tasks will immediately be stolen (the top level thread is still spinning up tasks) by all CPU threads which will then spawn their subtasks (which can't run quite yet because all cores are busy) and block indefinitely. > There are some nuances about the > execution-ordering of tasks within the context of a single CPU core I don't think it is a good idea to try and solve the deadlock problem this way. Also, the deadlock problem is basically solved at this point. I don't see any future work for this topic other than to stick to the above non-blocking model (or adopt a new model if needed). # More threading utility primitives / threading models The second issue is the creation of utilities and patterns that make it easier for developers to create parallel code. Arrow already had a fork/join (optional::ParallelFor), executor, thread pool, and basic future/promise patterns. Futures have been extended to allow for continuations. The AsyncGenerator utilities have been added to make it easier to deal with asynchronous streams. In addition, the new ExecPlan utilities will give us a basic actor model. Finally, stop tokens are a significant feature to call out here as well. There is plenty of future work here for both async generators and exec plan. It may be possible that async generator goes away (and is replaced entirely by exec plan) but that isn't clear to me yet. There could be some benefit in aligning on exec plans and reducing the amount of utilities that need to be maintained but exec plans will need more work before that is to be done. For futures there are some promise'ing (sorry, I had to) issues outstanding: * Splitting future into future & promise and introducing the concept of "abandoned promises" [1] * Exposing the asynchronous APIs externally * Solving the "future chaining" problem: (i.e. only start a callback once there is a consumer) [2] Another interesting investigation may be investigating some of the newer "truly async" filesystem APIs such as io_uring to see if they offer any improvements to performance. This topic is also unrelated to the topic of work stealing. # Scheduling (coarse grained) By scheduling I am talking about reordering how tasks get executed for any number of reasons. For example, you might use scheduling to make more efficient use of numa information or to handle workflow prioritization (e.g. some user calls are "high priority" while others are "low priority"). I did my work stealing investigation to see if work stealing scheduling could be used to improve cache locality. In particular, when processing a parquet or IPC file, does it make sense to do the column decode tasks right away while the data has been freshly loaded. I recorded my results in detail here[3]. The conclusion was that no, work stealing doesn't really help with cache locality in Arrow today (on a single-numa node system). It seems (on my system at least) that the CPU's prefetching capabilities preempted any advantage I sought to gain. Without any ability to show a concrete benefit I found it hard to justify getting a PR merged and went ahead and let it idle. However, if there is interest here, I'd be happy to revive my PRs. The first[4] generalizes the thread pool concept to allow for different task queuing strategies. The second[5] adds a work stealing implementation. # Scheduling (fine grained) The second potential benefit (and one I was able to measure) to work stealing is the ability to cut down on the thread pool overhead caused by locks & sharing. In other words, the benchmark measured here is "how many spawns per second" regardless of any real work being done. This is what was measured (and optimized) in the Tokio case study. It's also, at the moment, not terribly relevant to Arrow. You could potentially argue it is also not a real world problem. The problem is that Arrow just doesn't create that many thread tasks. Work stealing can take us from being able to spawn ~200k tasks/second to millions of tasks/second. However, even the most thread task intensive workload I could generate for Arrow (async IPC reading with maximum spawning) only generated about 10k tasks/second. This particular topic is related to the earlier email regarding batch sizes[6]. It seems natural to assume that the smaller the batch size the more potential thread tasks we will have (although, since the current exec plan is mostly serial, we aren't quite there yet). Although, I would caution that there are many improvements needed for small batch sizes and a more efficient scheduler is just one of those. [1] https://issues.apache.org/jira/browse/ARROW-12207 [2] https://issues.apache.org/jira/browse/ARROW-13004 [3] https://issues.apache.org/jira/browse/ARROW-10117 [4] https://github.com/apache/arrow/pull/10401 [5] https://github.com/apache/arrow/pull/10420 [6] https://lists.apache.org/thread.html/ra46c1052622f4f78b8cdf55d1394ef4ba11dc2f7f2821b53998e6c2d%40%3Cdev.arrow.apache.org%3E On Tue, Aug 3, 2021 at 11:54 AM Wes McKinney <wesmck...@gmail.com> wrote: > > hi all, > > We've had some discussions in the past about our approach to nested > parallelism (for example, reading multiple Parquet or CSV files or > compressed Arrow IPC files in parallel, each of which can benefit from > internal parallelism for faster parsing / decoding performance). Since > then, there has been a lot of work on asynchronous IO and futures in > the C++ project, and Parquet and CSV have refactored to use composable > futures and an Executor interface has been introduced. > > Relatedly, I saw that Weston had worked on a work-stealing thread pool > implementation [1]. I've read discussions in other projects on the > nuances around multicore work scheduling, for example in Rust's Tokio > asynchronous runtime [2]. > > It seems to me that adopting an executor-queue-per-logical-CPU-core > and a work-stealing scheduler is a reasonable path forward to solve > the nested parallelism problem. There are some nuances about the > execution-ordering of tasks within the context of a single CPU core > (i.e. child/dependent tasks should be given priority / not be > preempted by sibling tasks of the parent task). > > I'm curious what people think about a high level plan going forward > and what are the corresponding follow up projects to make sure that > each place in the codebase where we provide for spawning child tasks > (e.g. when reading many Parquet or CSV files in parallel) will > "compose" correctly to yield desirable task-execution-ordering and > balanced execution across CPU cores. > > Thanks > Wes > > [1]: https://github.com/apache/arrow/pull/10420 > [2]: http://tokio.rs/blog/2019-10-scheduler