Thanks Sasha, your intuition on the SerialExecutor is correct. One of the changes I am working on[1] will make it so that an executor is always present. The behavior when you do not have an executor is rather strange (sometimes I/O threads are used and sometimes the calling thread is used) and this leads to a lot of difficulty understanding performance. I plan to get back to this PR once 8.0.0 is finished but it will unfortunately require some changes to the scanner.
I do not believe the TaskScheduler handles "do work when inputs are ready otherwise try again later". It was written originally with join tasks in mind where we can't do any work until all of the input has arrived. So the pattern there is "queue everything in the node until we are ready to process it all and then run the task scheduler". Generally speaking, when a batch arrives, and you realize you have enough data to do some work, you should just do it there. This is how the filter & project nodes operate. They rely on the fact that we will be processing many batches in parallel to get their parallelism (e.g. no need for processing a single batch in parallel). However, if you've accumulated enough data that you want to process it in parallel then you should create a task scheduler at that point. For example, if you have queued up batches 1,2,3, and 4 (out of 20 total batches) and now you are ready to process them then you could create a TaskScheduler. You could then find yourself creating multiple task schedulers throughout the run of a node. The continuation would be marking batches 1/2/3/4 done, sending out any necessary results and, if that was the last batch group finished, marking the node finished. This feels like a general pattern. We could probably build some abstractions and common utilities around it if you find yourself up to it. Just to continue the example, let's pretend you can do work when you get 4 batches. So if you have 20 total batches there would be 5 "mega-tasks" worth of work. Each "mega-task" might run in parallel with other "mega-tasks". Each mega-task would also, itself, spawn smaller parallel sub-tasks. Each mega-task would be responsible for delivering its own output. The last mega-task to finish is also responsible for finishing the node. [1] https://github.com/apache/arrow/pull/12468 On Mon, Apr 25, 2022 at 12:19 PM Sasha Krassovsky <krassovskysa...@gmail.com> wrote: > > Hi Li, > I’ll answer the questions in order: > > 1. Your guess is correct! The Hash Join may be used standalone (mostly in > testing or benchmarking for now) or as part of the ExecNode. The ExecNode > will pass the task to the Executor to be scheduled, or will run it > immediately if it’s in sync mode (i.e. no executor). Our Hash Join benchmark > uses OpenMP to schedule things, and passes a lambda that does OpenMP things > to the HashJoin. > > 2. We might not have an executor if we want to execute synchronously. This is > set during construction of the ExecContext, which is given to the ExecPlan > during creation. If the ExecContext has a nullptr Executor, then we are in > async mode, otherwise we use the Executor to schedule. One confusing thing is > that we also have a SerialExecutor - I’m actually not quite sure what the > difference between using that and setting the Executor to nullptr is (might > have something to do with testing?). @Weston probably knows > > 3. You can think of the TaskGroup as a “parallel for loop”. TaskImpl is the > function that implements the work that needs to be split up, > TaskGroupContinuationImpl is what gets run after the for loop. TaskImpl will > receive the index of the task. If you’re familiar with OpenMP, it’s > equivalent to this: > > #pragma omp parallel for > for(int i = 0; i < 100; i++) > TaskImpl(omp_get_thread_num(), i); > TaskGroupContinuationImpl(); > > Examples of the two are here: > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416 > > <https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416> > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458 > > <https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458> > > Sasha > > > On Apr 25, 2022, at 8:35 AM, Li Jin <ice.xell...@gmail.com> wrote: > > > > Hello! > > > > I am reading the use of TaskScheduler inside C++ compute code (reading hash > > join) and have some questions about it, in particular: > > > > (1) What the purpose of SchedulerTaskCallback defined here: > > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join_node.cc#L428 > > (My guess is that the caller of TaskScheduler::StartTaskGroup needs to > > provide an implementation of a task executor, and the implementation of > > SchedulerTaskCallback inside hash_join_node.cc is just a vanillar > > implementation) > > > > (2) When would this task context not have an executor? > > https://github.com/apache/arrow/blob/master/cpp/src/arrow/compute/exec/hash_join_node.cc#L581 > > > > (3) What's the difference between TaskImpl and TaskGroupContinuationImpl in > > TaskScheduler::RegisterTaskGroup? And how would one normally define > > TaskGroupContinuationImpl? > > > > Sorry I am still learning the Arrow compute internals and appreciate help > > on understanding these. > > > > Li >