If I understand correctly, on InputReceived you’ll be accumulating batches until you have enough to compute the next output? In that case, you have two options: you can either just immediately compute it using the same thread, or call the schedule_callback directly (not using the scheduler). I think your pseudocode is correct - since whether or not you can output the next batch can only change on InputReceived, that’s the only spot you need to check. I think an elaboration of your pseudocode could be something like:
Status InputReceived(Batch b) lock(accum_lock); accum.push_back(b); if(enough_inputs) vector<Batch> batches = std::move(accum); unlock(accum_lock); compute_next_output(batches); return Status::OK(); Sasha > On Apr 25, 2022, at 3:29 PM, Li Jin <ice.xell...@gmail.com> wrote: > > Thanks! That's super helpful. > > A follow up question on TaskScheduler - What's the correct way to define a > task that "do work if input batches are ready, otherwise try later"? > > Sth like > > Status try_process(): > if enough_inputs_to _produce_next_output: > compute_and_produce_next_output(); > return Status::OK() > else: > # Is this right? > # Exit and try later > return Status::OK(); > > If I register this function with TaskScheduler, I think it only gets run > once, so I think I might need to schedule the next task when inputs are not > ready but I am not sure of the best way to do that. Any suggestions? > > Li > > On Mon, Apr 25, 2022 at 6:18 PM Sasha Krassovsky <krassovskysa...@gmail.com > <mailto:krassovskysa...@gmail.com>> > wrote: > >> Hi Li, >> I’ll answer the questions in order: >> >> 1. Your guess is correct! The Hash Join may be used standalone (mostly in >> testing or benchmarking for now) or as part of the ExecNode. The ExecNode >> will pass the task to the Executor to be scheduled, or will run it >> immediately if it’s in sync mode (i.e. no executor). Our Hash Join >> benchmark uses OpenMP to schedule things, and passes a lambda that does >> OpenMP things to the HashJoin. >> >> 2. We might not have an executor if we want to execute synchronously. This >> is set during construction of the ExecContext, which is given to the >> ExecPlan during creation. If the ExecContext has a nullptr Executor, then >> we are in async mode, otherwise we use the Executor to schedule. One >> confusing thing is that we also have a SerialExecutor - I’m actually not >> quite sure what the difference between using that and setting the Executor >> to nullptr is (might have something to do with testing?). @Weston probably >> knows >> >> 3. You can think of the TaskGroup as a “parallel for loop”. TaskImpl is >> the function that implements the work that needs to be split up, >> TaskGroupContinuationImpl is what gets run after the for loop. TaskImpl >> will receive the index of the task. If you’re familiar with OpenMP, it’s >> equivalent to this: >> >> #pragma omp parallel for >> for(int i = 0; i < 100; i++) >> TaskImpl(omp_get_thread_num(), i); >> TaskGroupContinuationImpl(); >> >> Examples of the two are here: >> >> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416 >> < >> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416 >> >> <https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416> >>> >> >> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458 >> >> <https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458> >> < >> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458 >> >> <https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458> >>> >> >> Sasha >> >>> On Apr 25, 2022, at 8:35 AM, Li Jin <ice.xell...@gmail.com> wrote: >>> >>> Hello! >>> >>> I am reading the use of TaskScheduler inside C++ compute code (reading >> hash >>> join) and have some questions about it, in particular: >>> >>> (1) What the purpose of SchedulerTaskCallback defined here: >>> >> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join_node.cc#L428 >>> (My guess is that the caller of TaskScheduler::StartTaskGroup needs to >>> provide an implementation of a task executor, and the implementation of >>> SchedulerTaskCallback inside hash_join_node.cc is just a vanillar >>> implementation) >>> >>> (2) When would this task context not have an executor? >>> >> https://github.com/apache/arrow/blob/master/cpp/src/arrow/compute/exec/hash_join_node.cc#L581 >>> >>> (3) What's the difference between TaskImpl and TaskGroupContinuationImpl >> in >>> TaskScheduler::RegisterTaskGroup? And how would one normally define >>> TaskGroupContinuationImpl? >>> >>> Sorry I am still learning the Arrow compute internals and appreciate help >>> on understanding these. >>> >>> Li