Thanks Sasha, your intuition on the SerialExecutor is correct.  One of
the changes I am working on[1] will make it so that an executor is
always present.  The behavior when you do not have an executor is
rather strange (sometimes I/O threads are used and sometimes the
calling thread is used) and this leads to a lot of difficulty
understanding performance.  I plan to get back to this PR once 8.0.0
is finished but it will unfortunately require some changes to the
scanner.

I do not believe the TaskScheduler handles "do work when inputs are
ready otherwise try again later".  It was written originally with join
tasks in mind where we can't do any work until all of the input has
arrived.  So the pattern there is "queue everything in the node until
we are ready to process it all and then run the task scheduler".

Generally speaking, when a batch arrives, and you realize you have
enough data to do some work, you should just do it there.  This is how
the filter & project nodes operate.  They rely on the fact that we
will be processing many batches in parallel to get their parallelism
(e.g. no need for processing a single batch in parallel).  However, if
you've accumulated enough data that you want to process it in parallel
then you should create a task scheduler at that point.  For example,
if you have queued up batches 1,2,3, and 4 (out of 20 total batches)
and now you are ready to process them then you could create a
TaskScheduler.  You could then find yourself creating multiple task
schedulers throughout the run of a node.  The continuation would be
marking batches 1/2/3/4 done, sending out any necessary results and,
if that was the last batch group finished, marking the node finished.
This feels like a general pattern. We could probably build some
abstractions and common utilities around it if you find yourself up to
it.

Just to continue the example, let's pretend you can do work when you
get 4 batches.  So if you have 20 total batches there would be 5
"mega-tasks" worth of work.  Each "mega-task" might run in parallel
with other "mega-tasks".  Each mega-task would also, itself, spawn
smaller parallel sub-tasks.  Each mega-task would be responsible for
delivering its own output.  The last mega-task to finish is also
responsible for finishing the node.

[1] https://github.com/apache/arrow/pull/12468

On Mon, Apr 25, 2022 at 12:19 PM Sasha Krassovsky
<krassovskysa...@gmail.com> wrote:
>
> Hi Li,
> I’ll answer the questions in order:
>
> 1. Your guess is correct! The Hash Join may be used standalone (mostly in 
> testing or benchmarking for now) or as part of the ExecNode. The ExecNode 
> will pass the task to the Executor to be scheduled, or will run it 
> immediately if it’s in sync mode (i.e. no executor). Our Hash Join benchmark 
> uses OpenMP to schedule things, and passes a lambda that does OpenMP things 
> to the HashJoin.
>
> 2. We might not have an executor if we want to execute synchronously. This is 
> set during construction of the ExecContext, which is given to the ExecPlan 
> during creation. If the ExecContext has a nullptr Executor, then we are in 
> async mode, otherwise we use the Executor to schedule. One confusing thing is 
> that we also have a SerialExecutor - I’m actually not quite sure what the 
> difference between using that and setting the Executor to nullptr is (might 
> have something to do with testing?). @Weston probably knows
>
> 3. You can think of the TaskGroup as a “parallel for loop”. TaskImpl is the 
> function that implements the work that needs to be split up, 
> TaskGroupContinuationImpl is what gets run after the for loop. TaskImpl will 
> receive the index of the task. If you’re familiar with OpenMP, it’s 
> equivalent to this:
>
> #pragma omp parallel for
> for(int i = 0; i < 100; i++)
>     TaskImpl(omp_get_thread_num(), i);
> TaskGroupContinuationImpl();
>
> Examples of the two are here:
> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416
>  
> <https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416>
> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458
>  
> <https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458>
>
> Sasha
>
> > On Apr 25, 2022, at 8:35 AM, Li Jin <ice.xell...@gmail.com> wrote:
> >
> > Hello!
> >
> > I am reading the use of TaskScheduler inside C++ compute code (reading hash
> > join) and have some questions about it, in particular:
> >
> > (1) What the purpose of SchedulerTaskCallback defined here:
> > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join_node.cc#L428
> > (My guess is that the caller of TaskScheduler::StartTaskGroup needs to
> > provide an implementation of a task executor, and the implementation of
> > SchedulerTaskCallback inside hash_join_node.cc is just a vanillar
> > implementation)
> >
> > (2) When would this task context not have an executor?
> > https://github.com/apache/arrow/blob/master/cpp/src/arrow/compute/exec/hash_join_node.cc#L581
> >
> > (3) What's the difference between TaskImpl and TaskGroupContinuationImpl in
> > TaskScheduler::RegisterTaskGroup? And how would one normally define
> > TaskGroupContinuationImpl?
> >
> > Sorry I am still learning the Arrow compute internals and appreciate help
> > on understanding these.
> >
> > Li
>

Reply via email to