Thanks! That's super helpful.

A follow up question on TaskScheduler - What's the correct way to define a
task that "do work if input batches are ready, otherwise try later"?

Sth like

Status try_process():
    if enough_inputs_to _produce_next_output:
         compute_and_produce_next_output();
         return Status::OK()
    else:
         # Is this right?
         # Exit and try later
         return Status::OK();

If I register this function with TaskScheduler, I think it only gets run
once, so I think I might need to schedule the next task when inputs are not
ready but I am not sure of the best way to do that. Any suggestions?

Li

On Mon, Apr 25, 2022 at 6:18 PM Sasha Krassovsky <krassovskysa...@gmail.com>
wrote:

> Hi Li,
> I’ll answer the questions in order:
>
> 1. Your guess is correct! The Hash Join may be used standalone (mostly in
> testing or benchmarking for now) or as part of the ExecNode. The ExecNode
> will pass the task to the Executor to be scheduled, or will run it
> immediately if it’s in sync mode (i.e. no executor). Our Hash Join
> benchmark uses OpenMP to schedule things, and passes a lambda that does
> OpenMP things to the HashJoin.
>
> 2. We might not have an executor if we want to execute synchronously. This
> is set during construction of the ExecContext, which is given to the
> ExecPlan during creation. If the ExecContext has a nullptr Executor, then
> we are in async mode, otherwise we use the Executor to schedule. One
> confusing thing is that we also have a SerialExecutor - I’m actually not
> quite sure what the difference between using that and setting the Executor
> to nullptr is (might have something to do with testing?). @Weston probably
> knows
>
> 3. You can think of the TaskGroup as a “parallel for loop”. TaskImpl is
> the function that implements the work that needs to be split up,
> TaskGroupContinuationImpl is what gets run after the for loop. TaskImpl
> will receive the index of the task. If you’re familiar with OpenMP, it’s
> equivalent to this:
>
> #pragma omp parallel for
> for(int i = 0; i < 100; i++)
>     TaskImpl(omp_get_thread_num(), i);
> TaskGroupContinuationImpl();
>
> Examples of the two are here:
>
> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416
> <
> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416
> >
>
> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458
> <
> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458
> >
>
> Sasha
>
> > On Apr 25, 2022, at 8:35 AM, Li Jin <ice.xell...@gmail.com> wrote:
> >
> > Hello!
> >
> > I am reading the use of TaskScheduler inside C++ compute code (reading
> hash
> > join) and have some questions about it, in particular:
> >
> > (1) What the purpose of SchedulerTaskCallback defined here:
> >
> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join_node.cc#L428
> > (My guess is that the caller of TaskScheduler::StartTaskGroup needs to
> > provide an implementation of a task executor, and the implementation of
> > SchedulerTaskCallback inside hash_join_node.cc is just a vanillar
> > implementation)
> >
> > (2) When would this task context not have an executor?
> >
> https://github.com/apache/arrow/blob/master/cpp/src/arrow/compute/exec/hash_join_node.cc#L581
> >
> > (3) What's the difference between TaskImpl and TaskGroupContinuationImpl
> in
> > TaskScheduler::RegisterTaskGroup? And how would one normally define
> > TaskGroupContinuationImpl?
> >
> > Sorry I am still learning the Arrow compute internals and appreciate help
> > on understanding these.
> >
> > Li
>
>

Reply via email to