Hi Li,
I’ll answer the questions in order:
1. Your guess is correct! The Hash Join may be used standalone (mostly in
testing or benchmarking for now) or as part of the ExecNode. The ExecNode will
pass the task to the Executor to be scheduled, or will run it immediately if
it’s in sync mode (i.e. no executor). Our Hash Join benchmark uses OpenMP to
schedule things, and passes a lambda that does OpenMP things to the HashJoin.
2. We might not have an executor if we want to execute synchronously. This is
set during construction of the ExecContext, which is given to the ExecPlan
during creation. If the ExecContext has a nullptr Executor, then we are in
async mode, otherwise we use the Executor to schedule. One confusing thing is
that we also have a SerialExecutor - I’m actually not quite sure what the
difference between using that and setting the Executor to nullptr is (might
have something to do with testing?). @Weston probably knows
3. You can think of the TaskGroup as a “parallel for loop”. TaskImpl is the
function that implements the work that needs to be split up,
TaskGroupContinuationImpl is what gets run after the for loop. TaskImpl will
receive the index of the task. If you’re familiar with OpenMP, it’s equivalent
to this:
#pragma omp parallel for
for(int i = 0; i < 100; i++)
TaskImpl(omp_get_thread_num(), i);
TaskGroupContinuationImpl();
Examples of the two are here:
https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416
<https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416>
https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458
<https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458>
Sasha
> On Apr 25, 2022, at 8:35 AM, Li Jin <[email protected]> wrote:
>
> Hello!
>
> I am reading the use of TaskScheduler inside C++ compute code (reading hash
> join) and have some questions about it, in particular:
>
> (1) What the purpose of SchedulerTaskCallback defined here:
> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join_node.cc#L428
> (My guess is that the caller of TaskScheduler::StartTaskGroup needs to
> provide an implementation of a task executor, and the implementation of
> SchedulerTaskCallback inside hash_join_node.cc is just a vanillar
> implementation)
>
> (2) When would this task context not have an executor?
> https://github.com/apache/arrow/blob/master/cpp/src/arrow/compute/exec/hash_join_node.cc#L581
>
> (3) What's the difference between TaskImpl and TaskGroupContinuationImpl in
> TaskScheduler::RegisterTaskGroup? And how would one normally define
> TaskGroupContinuationImpl?
>
> Sorry I am still learning the Arrow compute internals and appreciate help
> on understanding these.
>
> Li