If I understand correctly, on InputReceived you’ll be accumulating batches 
until you have enough to compute the next output? In that case, you have two 
options: you can either just immediately compute it using the same thread, or 
call the schedule_callback directly (not using the scheduler). I think your 
pseudocode is correct - since whether or not you can output the next batch can 
only change on InputReceived, that’s the only spot you need to check. I think 
an elaboration of your pseudocode could be something like:

Status InputReceived(Batch b)
    lock(accum_lock);
    accum.push_back(b);
    if(enough_inputs)
        vector<Batch> batches = std::move(accum);
        unlock(accum_lock);
        compute_next_output(batches);
    return Status::OK();

Sasha 

> On Apr 25, 2022, at 3:29 PM, Li Jin <ice.xell...@gmail.com> wrote:
> 
> Thanks! That's super helpful.
> 
> A follow up question on TaskScheduler - What's the correct way to define a
> task that "do work if input batches are ready, otherwise try later"?
> 
> Sth like
> 
> Status try_process():
> if enough_inputs_to _produce_next_output:
> compute_and_produce_next_output();
> return Status::OK()
> else:
> # Is this right?
> # Exit and try later
> return Status::OK();
> 
> If I register this function with TaskScheduler, I think it only gets run
> once, so I think I might need to schedule the next task when inputs are not
> ready but I am not sure of the best way to do that. Any suggestions?
> 
> Li
> 
> On Mon, Apr 25, 2022 at 6:18 PM Sasha Krassovsky <krassovskysa...@gmail.com 
> <mailto:krassovskysa...@gmail.com>>
> wrote:
> 
>> Hi Li,
>> I’ll answer the questions in order:
>> 
>> 1. Your guess is correct! The Hash Join may be used standalone (mostly in
>> testing or benchmarking for now) or as part of the ExecNode. The ExecNode
>> will pass the task to the Executor to be scheduled, or will run it
>> immediately if it’s in sync mode (i.e. no executor). Our Hash Join
>> benchmark uses OpenMP to schedule things, and passes a lambda that does
>> OpenMP things to the HashJoin.
>> 
>> 2. We might not have an executor if we want to execute synchronously. This
>> is set during construction of the ExecContext, which is given to the
>> ExecPlan during creation. If the ExecContext has a nullptr Executor, then
>> we are in async mode, otherwise we use the Executor to schedule. One
>> confusing thing is that we also have a SerialExecutor - I’m actually not
>> quite sure what the difference between using that and setting the Executor
>> to nullptr is (might have something to do with testing?). @Weston probably
>> knows
>> 
>> 3. You can think of the TaskGroup as a “parallel for loop”. TaskImpl is
>> the function that implements the work that needs to be split up,
>> TaskGroupContinuationImpl is what gets run after the for loop. TaskImpl
>> will receive the index of the task. If you’re familiar with OpenMP, it’s
>> equivalent to this:
>> 
>> #pragma omp parallel for
>> for(int i = 0; i < 100; i++)
>> TaskImpl(omp_get_thread_num(), i);
>> TaskGroupContinuationImpl();
>> 
>> Examples of the two are here:
>> 
>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416
>> <
>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416
>>  
>> <https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416>
>>> 
>> 
>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458
>>  
>> <https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458>
>> <
>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458
>>  
>> <https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458>
>>> 
>> 
>> Sasha
>> 
>>> On Apr 25, 2022, at 8:35 AM, Li Jin <ice.xell...@gmail.com> wrote:
>>> 
>>> Hello!
>>> 
>>> I am reading the use of TaskScheduler inside C++ compute code (reading
>> hash
>>> join) and have some questions about it, in particular:
>>> 
>>> (1) What the purpose of SchedulerTaskCallback defined here:
>>> 
>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join_node.cc#L428
>>> (My guess is that the caller of TaskScheduler::StartTaskGroup needs to
>>> provide an implementation of a task executor, and the implementation of
>>> SchedulerTaskCallback inside hash_join_node.cc is just a vanillar
>>> implementation)
>>> 
>>> (2) When would this task context not have an executor?
>>> 
>> https://github.com/apache/arrow/blob/master/cpp/src/arrow/compute/exec/hash_join_node.cc#L581
>>> 
>>> (3) What's the difference between TaskImpl and TaskGroupContinuationImpl
>> in
>>> TaskScheduler::RegisterTaskGroup? And how would one normally define
>>> TaskGroupContinuationImpl?
>>> 
>>> Sorry I am still learning the Arrow compute internals and appreciate help
>>> on understanding these.
>>> 
>>> Li

Reply via email to