Hey thanks again for the reply! > I would suggest accumulating all batches just like in Hash Join This is something I intentionally try to avoid because asof join (and many other time series operations) can be performed in a streaming fashion to reduce memory footprint.
> When you want to scale up to multiple threads, you will no longer be able to rely on any order because scheduling is generally pretty nondeterministic I think this depends on the implementation. If all the nodes receive and output ordered batches, inside each node they can still use multiple threads to compute the result. Extra work needs to be done to ensure the ordering of output data but I don't think it is infeasible to multi-threading a streaming algorithm. Maybe a broader questions is "What is a good way to implement a ExecNode that requires ordered inputs and produces ordered output and how does that fit with the current parallelization model of Arrow compute" To be honest I am not very familiar with the parallelization model. I have the vague impression that each ExecNode has its own processing thread and data is sent asynchronously between nodes (i.e., each node will queue the input batches instead of blocking the upstream node thread). Also I have the impression that some nodes do parallelizations internally, but I am not sure how they would output. (i.e., do they output batches / call InputReceived downstream nodes with multi thread)? On Tue, Apr 26, 2022 at 2:22 PM Sasha Krassovsky <krassovskysa...@gmail.com> wrote: > I would advise against relying on any specific ordering of batches coming > in. When you want to scale up to multiple threads, you will no longer be > able to rely on any order because scheduling is generally pretty > nondeterministic. I would suggest accumulating all batches just like in > Hash Join, and when InputFinished is called, sort them. I’d even suggest > not relying on input batches being sorted within themselves, so you’ll have > to implement a mini “sort node” (if you don’t have some other fancier data > structure for this join). The accumulation itself shouldn’t be a > performance hit either because the threads that would be processing the > join will continue processing the inputs to the join, so the overall > throughput shouldn’t be affected. > > After the sorting, you can kick off a task group that will compute the > results. One thing you’ll have to experiment with is how many tasks to > start: one for each pair of batches, or one for each left-side batch, or > one for each right-side batch. If it’s the first, it may be useful to > extend the TaskGroup interface to allow for two-dimensional task groups (so > that it would be easier to start a task for each pair). > > Sasha > > > On Apr 26, 2022, at 11:03 AM, Li Jin <ice.xell...@gmail.com> wrote: > > > >> In order to produce a output for a left batch, I would need to wait > until > > I received enough batches from the right tables to cover all potential > > matches (wait until I have seen right timestamps outside the matching > range) > > Add a bit more explanation, let's say the time range of the current left > > batch is (2020101, 20200201), in order to produce the join result for > this > > batch, I need to receive all data from the right tables from (-inf, > > 20200201) and I can know this once I have seen data from all right tables > > that have timestamp after 20200201 (since data arrives in time order). > But > > I do not need to receive all data in order to produce an output batch > > (unlike hash join). > > > > On Tue, Apr 26, 2022 at 1:59 PM Li Jin <ice.xell...@gmail.com> wrote: > > > >> Thanks both for the reply. To add a bit more context, I am trying to > >> implement an "asof join". Here I have one left table and n right table, > and > >> all batches arrive in time order. > >> > >> In order to produce a output for a left batch, I would need to wait > until > >> I received enough batches from the right tables to cover all potential > >> matches (wait until I have seen right timestamps outside the matching > range) > >> > >> From both replies it sounds like I should just do the check if I got > >> enough data in InputReceived function and do work there when I have > enough > >> data. > >> > >> However, one thing that I am not sure about is how does the > >> parallelization comes into play - it sounds like InputReceived could be > >> called by multiple thread of the same input node for different batches? > >> Currently I have just trying to get a baseline implementation that has > one > >> thread doing the join so if InputReceived is called by multiple thread I > >> might ended up blocking other threads unnecessarily. > >> > >> If I have a dedicate thread/executor that does the join and > InputReceived > >> just queue the batches and return immediately, I felt like it would be > more > >> efficient. > >> > >> Thoughts? > >> > >> Thanks, > >> Li > >> > >> On Mon, Apr 25, 2022 at 6:41 PM Sasha Krassovsky < > >> krassovskysa...@gmail.com> wrote: > >> > >>> If I understand correctly, on InputReceived you’ll be accumulating > >>> batches until you have enough to compute the next output? In that > case, you > >>> have two options: you can either just immediately compute it using the > same > >>> thread, or call the schedule_callback directly (not using the > scheduler). I > >>> think your pseudocode is correct - since whether or not you can output > the > >>> next batch can only change on InputReceived, that’s the only spot you > need > >>> to check. I think an elaboration of your pseudocode could be something > like: > >>> > >>> Status InputReceived(Batch b) > >>> lock(accum_lock); > >>> accum.push_back(b); > >>> if(enough_inputs) > >>> vector<Batch> batches = std::move(accum); > >>> unlock(accum_lock); > >>> compute_next_output(batches); > >>> return Status::OK(); > >>> > >>> Sasha > >>> > >>>> On Apr 25, 2022, at 3:29 PM, Li Jin <ice.xell...@gmail.com> wrote: > >>>> > >>>> Thanks! That's super helpful. > >>>> > >>>> A follow up question on TaskScheduler - What's the correct way to > >>> define a > >>>> task that "do work if input batches are ready, otherwise try later"? > >>>> > >>>> Sth like > >>>> > >>>> Status try_process(): > >>>> if enough_inputs_to _produce_next_output: > >>>> compute_and_produce_next_output(); > >>>> return Status::OK() > >>>> else: > >>>> # Is this right? > >>>> # Exit and try later > >>>> return Status::OK(); > >>>> > >>>> If I register this function with TaskScheduler, I think it only gets > run > >>>> once, so I think I might need to schedule the next task when inputs > are > >>> not > >>>> ready but I am not sure of the best way to do that. Any suggestions? > >>>> > >>>> Li > >>>> > >>>> On Mon, Apr 25, 2022 at 6:18 PM Sasha Krassovsky < > >>> krassovskysa...@gmail.com <mailto:krassovskysa...@gmail.com>> > >>>> wrote: > >>>> > >>>>> Hi Li, > >>>>> I’ll answer the questions in order: > >>>>> > >>>>> 1. Your guess is correct! The Hash Join may be used standalone > (mostly > >>> in > >>>>> testing or benchmarking for now) or as part of the ExecNode. The > >>> ExecNode > >>>>> will pass the task to the Executor to be scheduled, or will run it > >>>>> immediately if it’s in sync mode (i.e. no executor). Our Hash Join > >>>>> benchmark uses OpenMP to schedule things, and passes a lambda that > does > >>>>> OpenMP things to the HashJoin. > >>>>> > >>>>> 2. We might not have an executor if we want to execute synchronously. > >>> This > >>>>> is set during construction of the ExecContext, which is given to the > >>>>> ExecPlan during creation. If the ExecContext has a nullptr Executor, > >>> then > >>>>> we are in async mode, otherwise we use the Executor to schedule. One > >>>>> confusing thing is that we also have a SerialExecutor - I’m actually > >>> not > >>>>> quite sure what the difference between using that and setting the > >>> Executor > >>>>> to nullptr is (might have something to do with testing?). @Weston > >>> probably > >>>>> knows > >>>>> > >>>>> 3. You can think of the TaskGroup as a “parallel for loop”. TaskImpl > is > >>>>> the function that implements the work that needs to be split up, > >>>>> TaskGroupContinuationImpl is what gets run after the for loop. > TaskImpl > >>>>> will receive the index of the task. If you’re familiar with OpenMP, > >>> it’s > >>>>> equivalent to this: > >>>>> > >>>>> #pragma omp parallel for > >>>>> for(int i = 0; i < 100; i++) > >>>>> TaskImpl(omp_get_thread_num(), i); > >>>>> TaskGroupContinuationImpl(); > >>>>> > >>>>> Examples of the two are here: > >>>>> > >>>>> > >>> > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416 > >>>>> < > >>>>> > >>> > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416 > >>> < > >>> > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416 > >>>> > >>>>>> > >>>>> > >>>>> > >>> > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458 > >>> < > >>> > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458 > >>>> > >>>>> < > >>>>> > >>> > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458 > >>> < > >>> > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458 > >>>> > >>>>>> > >>>>> > >>>>> Sasha > >>>>> > >>>>>> On Apr 25, 2022, at 8:35 AM, Li Jin <ice.xell...@gmail.com> wrote: > >>>>>> > >>>>>> Hello! > >>>>>> > >>>>>> I am reading the use of TaskScheduler inside C++ compute code > (reading > >>>>> hash > >>>>>> join) and have some questions about it, in particular: > >>>>>> > >>>>>> (1) What the purpose of SchedulerTaskCallback defined here: > >>>>>> > >>>>> > >>> > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join_node.cc#L428 > >>>>>> (My guess is that the caller of TaskScheduler::StartTaskGroup needs > to > >>>>>> provide an implementation of a task executor, and the implementation > >>> of > >>>>>> SchedulerTaskCallback inside hash_join_node.cc is just a vanillar > >>>>>> implementation) > >>>>>> > >>>>>> (2) When would this task context not have an executor? > >>>>>> > >>>>> > >>> > https://github.com/apache/arrow/blob/master/cpp/src/arrow/compute/exec/hash_join_node.cc#L581 > >>>>>> > >>>>>> (3) What's the difference between TaskImpl and > >>> TaskGroupContinuationImpl > >>>>> in > >>>>>> TaskScheduler::RegisterTaskGroup? And how would one normally define > >>>>>> TaskGroupContinuationImpl? > >>>>>> > >>>>>> Sorry I am still learning the Arrow compute internals and appreciate > >>> help > >>>>>> on understanding these. > >>>>>> > >>>>>> Li > >>> > >>> > >