Thanks! That's super helpful. A follow up question on TaskScheduler - What's the correct way to define a task that "do work if input batches are ready, otherwise try later"?
Sth like Status try_process(): if enough_inputs_to _produce_next_output: compute_and_produce_next_output(); return Status::OK() else: # Is this right? # Exit and try later return Status::OK(); If I register this function with TaskScheduler, I think it only gets run once, so I think I might need to schedule the next task when inputs are not ready but I am not sure of the best way to do that. Any suggestions? Li On Mon, Apr 25, 2022 at 6:18 PM Sasha Krassovsky <krassovskysa...@gmail.com> wrote: > Hi Li, > I’ll answer the questions in order: > > 1. Your guess is correct! The Hash Join may be used standalone (mostly in > testing or benchmarking for now) or as part of the ExecNode. The ExecNode > will pass the task to the Executor to be scheduled, or will run it > immediately if it’s in sync mode (i.e. no executor). Our Hash Join > benchmark uses OpenMP to schedule things, and passes a lambda that does > OpenMP things to the HashJoin. > > 2. We might not have an executor if we want to execute synchronously. This > is set during construction of the ExecContext, which is given to the > ExecPlan during creation. If the ExecContext has a nullptr Executor, then > we are in async mode, otherwise we use the Executor to schedule. One > confusing thing is that we also have a SerialExecutor - I’m actually not > quite sure what the difference between using that and setting the Executor > to nullptr is (might have something to do with testing?). @Weston probably > knows > > 3. You can think of the TaskGroup as a “parallel for loop”. TaskImpl is > the function that implements the work that needs to be split up, > TaskGroupContinuationImpl is what gets run after the for loop. TaskImpl > will receive the index of the task. If you’re familiar with OpenMP, it’s > equivalent to this: > > #pragma omp parallel for > for(int i = 0; i < 100; i++) > TaskImpl(omp_get_thread_num(), i); > TaskGroupContinuationImpl(); > > Examples of the two are here: > > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416 > < > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416 > > > > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458 > < > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458 > > > > Sasha > > > On Apr 25, 2022, at 8:35 AM, Li Jin <ice.xell...@gmail.com> wrote: > > > > Hello! > > > > I am reading the use of TaskScheduler inside C++ compute code (reading > hash > > join) and have some questions about it, in particular: > > > > (1) What the purpose of SchedulerTaskCallback defined here: > > > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join_node.cc#L428 > > (My guess is that the caller of TaskScheduler::StartTaskGroup needs to > > provide an implementation of a task executor, and the implementation of > > SchedulerTaskCallback inside hash_join_node.cc is just a vanillar > > implementation) > > > > (2) When would this task context not have an executor? > > > https://github.com/apache/arrow/blob/master/cpp/src/arrow/compute/exec/hash_join_node.cc#L581 > > > > (3) What's the difference between TaskImpl and TaskGroupContinuationImpl > in > > TaskScheduler::RegisterTaskGroup? And how would one normally define > > TaskGroupContinuationImpl? > > > > Sorry I am still learning the Arrow compute internals and appreciate help > > on understanding these. > > > > Li > >