> In order to produce a output for a left batch, I would need to wait until I received enough batches from the right tables to cover all potential matches (wait until I have seen right timestamps outside the matching range) Add a bit more explanation, let's say the time range of the current left batch is (2020101, 20200201), in order to produce the join result for this batch, I need to receive all data from the right tables from (-inf, 20200201) and I can know this once I have seen data from all right tables that have timestamp after 20200201 (since data arrives in time order). But I do not need to receive all data in order to produce an output batch (unlike hash join).
On Tue, Apr 26, 2022 at 1:59 PM Li Jin <ice.xell...@gmail.com> wrote: > Thanks both for the reply. To add a bit more context, I am trying to > implement an "asof join". Here I have one left table and n right table, and > all batches arrive in time order. > > In order to produce a output for a left batch, I would need to wait until > I received enough batches from the right tables to cover all potential > matches (wait until I have seen right timestamps outside the matching range) > > From both replies it sounds like I should just do the check if I got > enough data in InputReceived function and do work there when I have enough > data. > > However, one thing that I am not sure about is how does the > parallelization comes into play - it sounds like InputReceived could be > called by multiple thread of the same input node for different batches? > Currently I have just trying to get a baseline implementation that has one > thread doing the join so if InputReceived is called by multiple thread I > might ended up blocking other threads unnecessarily. > > If I have a dedicate thread/executor that does the join and InputReceived > just queue the batches and return immediately, I felt like it would be more > efficient. > > Thoughts? > > Thanks, > Li > > On Mon, Apr 25, 2022 at 6:41 PM Sasha Krassovsky < > krassovskysa...@gmail.com> wrote: > >> If I understand correctly, on InputReceived you’ll be accumulating >> batches until you have enough to compute the next output? In that case, you >> have two options: you can either just immediately compute it using the same >> thread, or call the schedule_callback directly (not using the scheduler). I >> think your pseudocode is correct - since whether or not you can output the >> next batch can only change on InputReceived, that’s the only spot you need >> to check. I think an elaboration of your pseudocode could be something like: >> >> Status InputReceived(Batch b) >> lock(accum_lock); >> accum.push_back(b); >> if(enough_inputs) >> vector<Batch> batches = std::move(accum); >> unlock(accum_lock); >> compute_next_output(batches); >> return Status::OK(); >> >> Sasha >> >> > On Apr 25, 2022, at 3:29 PM, Li Jin <ice.xell...@gmail.com> wrote: >> > >> > Thanks! That's super helpful. >> > >> > A follow up question on TaskScheduler - What's the correct way to >> define a >> > task that "do work if input batches are ready, otherwise try later"? >> > >> > Sth like >> > >> > Status try_process(): >> > if enough_inputs_to _produce_next_output: >> > compute_and_produce_next_output(); >> > return Status::OK() >> > else: >> > # Is this right? >> > # Exit and try later >> > return Status::OK(); >> > >> > If I register this function with TaskScheduler, I think it only gets run >> > once, so I think I might need to schedule the next task when inputs are >> not >> > ready but I am not sure of the best way to do that. Any suggestions? >> > >> > Li >> > >> > On Mon, Apr 25, 2022 at 6:18 PM Sasha Krassovsky < >> krassovskysa...@gmail.com <mailto:krassovskysa...@gmail.com>> >> > wrote: >> > >> >> Hi Li, >> >> I’ll answer the questions in order: >> >> >> >> 1. Your guess is correct! The Hash Join may be used standalone (mostly >> in >> >> testing or benchmarking for now) or as part of the ExecNode. The >> ExecNode >> >> will pass the task to the Executor to be scheduled, or will run it >> >> immediately if it’s in sync mode (i.e. no executor). Our Hash Join >> >> benchmark uses OpenMP to schedule things, and passes a lambda that does >> >> OpenMP things to the HashJoin. >> >> >> >> 2. We might not have an executor if we want to execute synchronously. >> This >> >> is set during construction of the ExecContext, which is given to the >> >> ExecPlan during creation. If the ExecContext has a nullptr Executor, >> then >> >> we are in async mode, otherwise we use the Executor to schedule. One >> >> confusing thing is that we also have a SerialExecutor - I’m actually >> not >> >> quite sure what the difference between using that and setting the >> Executor >> >> to nullptr is (might have something to do with testing?). @Weston >> probably >> >> knows >> >> >> >> 3. You can think of the TaskGroup as a “parallel for loop”. TaskImpl is >> >> the function that implements the work that needs to be split up, >> >> TaskGroupContinuationImpl is what gets run after the for loop. TaskImpl >> >> will receive the index of the task. If you’re familiar with OpenMP, >> it’s >> >> equivalent to this: >> >> >> >> #pragma omp parallel for >> >> for(int i = 0; i < 100; i++) >> >> TaskImpl(omp_get_thread_num(), i); >> >> TaskGroupContinuationImpl(); >> >> >> >> Examples of the two are here: >> >> >> >> >> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416 >> >> < >> >> >> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416 >> < >> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416 >> > >> >>> >> >> >> >> >> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458 >> < >> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458 >> > >> >> < >> >> >> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458 >> < >> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458 >> > >> >>> >> >> >> >> Sasha >> >> >> >>> On Apr 25, 2022, at 8:35 AM, Li Jin <ice.xell...@gmail.com> wrote: >> >>> >> >>> Hello! >> >>> >> >>> I am reading the use of TaskScheduler inside C++ compute code (reading >> >> hash >> >>> join) and have some questions about it, in particular: >> >>> >> >>> (1) What the purpose of SchedulerTaskCallback defined here: >> >>> >> >> >> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join_node.cc#L428 >> >>> (My guess is that the caller of TaskScheduler::StartTaskGroup needs to >> >>> provide an implementation of a task executor, and the implementation >> of >> >>> SchedulerTaskCallback inside hash_join_node.cc is just a vanillar >> >>> implementation) >> >>> >> >>> (2) When would this task context not have an executor? >> >>> >> >> >> https://github.com/apache/arrow/blob/master/cpp/src/arrow/compute/exec/hash_join_node.cc#L581 >> >>> >> >>> (3) What's the difference between TaskImpl and >> TaskGroupContinuationImpl >> >> in >> >>> TaskScheduler::RegisterTaskGroup? And how would one normally define >> >>> TaskGroupContinuationImpl? >> >>> >> >>> Sorry I am still learning the Arrow compute internals and appreciate >> help >> >>> on understanding these. >> >>> >> >>> Li >> >>