I think this is doable. I think we want to introduce the concept of a batch index. The scanner is then responsible for assigning a batch index to each outgoing batch. Some ExecNode's would reset or destroy the batch index (for example, you cannot generally do an asof join after a hash join unless you insert a sort node in between). The sort node can reinstate the batch index.
A OrderedExecNode would extend from ExecNode and use this batch index. A simple implementation would be: OrderedExecNode has two states, processing and idle. * If a batch arrives and it is the next batch and the state is idle: Move state to processing and begin operating as many batches as possible until the batch queue does not contain the next batch * If a batch arrives and it is not the next batch: Add the batch to the batch queue * If a batch arrives and the state is not idle: Add the batch to the batch queue The batch queue can be some kind of heap structure / priority queue that will know whether the top item is "the next item". "The next item" is a well defined concept as long as the batch index is monotonically increasing (e.g. this could introduce some complication if the asof join is after a filter join which eliminates an entire batch but we can always emit an empty batch). I call the above a simple implementation because it runs serially and so this could become a choke point for a multithreaded engine. A more complex implementation would change "process the batch" to "synchronously increment counters and update shared state and then launch a task that can run in parallel". For example, in an as-of join I think you could do better to avoid a serial choke point. On Tue, Apr 26, 2022 at 9:54 AM Li Jin <ice.xell...@gmail.com> wrote: > > Hey thanks again for the reply! > > > I would suggest accumulating all batches just like in Hash Join > This is something I intentionally try to avoid because asof join (and many > other time series operations) can be performed in a streaming fashion to > reduce memory footprint. > > > When you want to scale up to multiple threads, you will no longer be able > to rely on any order because scheduling is generally pretty nondeterministic > I think this depends on the implementation. If all the nodes receive and > output ordered batches, inside each node they can still use multiple > threads to compute the result. Extra work needs to be done to ensure the > ordering of output data but I don't think it is infeasible to > multi-threading a streaming algorithm. > > Maybe a broader questions is "What is a good way to implement a ExecNode > that requires ordered inputs and produces ordered output and how does that > fit with the current parallelization model of Arrow compute" > > To be honest I am not very familiar with the parallelization model. I have > the vague impression that each ExecNode has its own processing thread and > data is sent asynchronously between nodes (i.e., each node will queue the > input batches instead of blocking the upstream node thread). Also I have > the impression that some nodes do parallelizations internally, but I am not > sure how they would output. (i.e., do they output batches / call > InputReceived downstream nodes with multi thread)? > > > > > On Tue, Apr 26, 2022 at 2:22 PM Sasha Krassovsky <krassovskysa...@gmail.com> > wrote: > > > I would advise against relying on any specific ordering of batches coming > > in. When you want to scale up to multiple threads, you will no longer be > > able to rely on any order because scheduling is generally pretty > > nondeterministic. I would suggest accumulating all batches just like in > > Hash Join, and when InputFinished is called, sort them. I’d even suggest > > not relying on input batches being sorted within themselves, so you’ll have > > to implement a mini “sort node” (if you don’t have some other fancier data > > structure for this join). The accumulation itself shouldn’t be a > > performance hit either because the threads that would be processing the > > join will continue processing the inputs to the join, so the overall > > throughput shouldn’t be affected. > > > > After the sorting, you can kick off a task group that will compute the > > results. One thing you’ll have to experiment with is how many tasks to > > start: one for each pair of batches, or one for each left-side batch, or > > one for each right-side batch. If it’s the first, it may be useful to > > extend the TaskGroup interface to allow for two-dimensional task groups (so > > that it would be easier to start a task for each pair). > > > > Sasha > > > > > On Apr 26, 2022, at 11:03 AM, Li Jin <ice.xell...@gmail.com> wrote: > > > > > >> In order to produce a output for a left batch, I would need to wait > > until > > > I received enough batches from the right tables to cover all potential > > > matches (wait until I have seen right timestamps outside the matching > > range) > > > Add a bit more explanation, let's say the time range of the current left > > > batch is (2020101, 20200201), in order to produce the join result for > > this > > > batch, I need to receive all data from the right tables from (-inf, > > > 20200201) and I can know this once I have seen data from all right tables > > > that have timestamp after 20200201 (since data arrives in time order). > > But > > > I do not need to receive all data in order to produce an output batch > > > (unlike hash join). > > > > > > On Tue, Apr 26, 2022 at 1:59 PM Li Jin <ice.xell...@gmail.com> wrote: > > > > > >> Thanks both for the reply. To add a bit more context, I am trying to > > >> implement an "asof join". Here I have one left table and n right table, > > and > > >> all batches arrive in time order. > > >> > > >> In order to produce a output for a left batch, I would need to wait > > until > > >> I received enough batches from the right tables to cover all potential > > >> matches (wait until I have seen right timestamps outside the matching > > range) > > >> > > >> From both replies it sounds like I should just do the check if I got > > >> enough data in InputReceived function and do work there when I have > > enough > > >> data. > > >> > > >> However, one thing that I am not sure about is how does the > > >> parallelization comes into play - it sounds like InputReceived could be > > >> called by multiple thread of the same input node for different batches? > > >> Currently I have just trying to get a baseline implementation that has > > one > > >> thread doing the join so if InputReceived is called by multiple thread I > > >> might ended up blocking other threads unnecessarily. > > >> > > >> If I have a dedicate thread/executor that does the join and > > InputReceived > > >> just queue the batches and return immediately, I felt like it would be > > more > > >> efficient. > > >> > > >> Thoughts? > > >> > > >> Thanks, > > >> Li > > >> > > >> On Mon, Apr 25, 2022 at 6:41 PM Sasha Krassovsky < > > >> krassovskysa...@gmail.com> wrote: > > >> > > >>> If I understand correctly, on InputReceived you’ll be accumulating > > >>> batches until you have enough to compute the next output? In that > > case, you > > >>> have two options: you can either just immediately compute it using the > > same > > >>> thread, or call the schedule_callback directly (not using the > > scheduler). I > > >>> think your pseudocode is correct - since whether or not you can output > > the > > >>> next batch can only change on InputReceived, that’s the only spot you > > need > > >>> to check. I think an elaboration of your pseudocode could be something > > like: > > >>> > > >>> Status InputReceived(Batch b) > > >>> lock(accum_lock); > > >>> accum.push_back(b); > > >>> if(enough_inputs) > > >>> vector<Batch> batches = std::move(accum); > > >>> unlock(accum_lock); > > >>> compute_next_output(batches); > > >>> return Status::OK(); > > >>> > > >>> Sasha > > >>> > > >>>> On Apr 25, 2022, at 3:29 PM, Li Jin <ice.xell...@gmail.com> wrote: > > >>>> > > >>>> Thanks! That's super helpful. > > >>>> > > >>>> A follow up question on TaskScheduler - What's the correct way to > > >>> define a > > >>>> task that "do work if input batches are ready, otherwise try later"? > > >>>> > > >>>> Sth like > > >>>> > > >>>> Status try_process(): > > >>>> if enough_inputs_to _produce_next_output: > > >>>> compute_and_produce_next_output(); > > >>>> return Status::OK() > > >>>> else: > > >>>> # Is this right? > > >>>> # Exit and try later > > >>>> return Status::OK(); > > >>>> > > >>>> If I register this function with TaskScheduler, I think it only gets > > run > > >>>> once, so I think I might need to schedule the next task when inputs > > are > > >>> not > > >>>> ready but I am not sure of the best way to do that. Any suggestions? > > >>>> > > >>>> Li > > >>>> > > >>>> On Mon, Apr 25, 2022 at 6:18 PM Sasha Krassovsky < > > >>> krassovskysa...@gmail.com <mailto:krassovskysa...@gmail.com>> > > >>>> wrote: > > >>>> > > >>>>> Hi Li, > > >>>>> I’ll answer the questions in order: > > >>>>> > > >>>>> 1. Your guess is correct! The Hash Join may be used standalone > > (mostly > > >>> in > > >>>>> testing or benchmarking for now) or as part of the ExecNode. The > > >>> ExecNode > > >>>>> will pass the task to the Executor to be scheduled, or will run it > > >>>>> immediately if it’s in sync mode (i.e. no executor). Our Hash Join > > >>>>> benchmark uses OpenMP to schedule things, and passes a lambda that > > does > > >>>>> OpenMP things to the HashJoin. > > >>>>> > > >>>>> 2. We might not have an executor if we want to execute synchronously. > > >>> This > > >>>>> is set during construction of the ExecContext, which is given to the > > >>>>> ExecPlan during creation. If the ExecContext has a nullptr Executor, > > >>> then > > >>>>> we are in async mode, otherwise we use the Executor to schedule. One > > >>>>> confusing thing is that we also have a SerialExecutor - I’m actually > > >>> not > > >>>>> quite sure what the difference between using that and setting the > > >>> Executor > > >>>>> to nullptr is (might have something to do with testing?). @Weston > > >>> probably > > >>>>> knows > > >>>>> > > >>>>> 3. You can think of the TaskGroup as a “parallel for loop”. TaskImpl > > is > > >>>>> the function that implements the work that needs to be split up, > > >>>>> TaskGroupContinuationImpl is what gets run after the for loop. > > TaskImpl > > >>>>> will receive the index of the task. If you’re familiar with OpenMP, > > >>> it’s > > >>>>> equivalent to this: > > >>>>> > > >>>>> #pragma omp parallel for > > >>>>> for(int i = 0; i < 100; i++) > > >>>>> TaskImpl(omp_get_thread_num(), i); > > >>>>> TaskGroupContinuationImpl(); > > >>>>> > > >>>>> Examples of the two are here: > > >>>>> > > >>>>> > > >>> > > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416 > > >>>>> < > > >>>>> > > >>> > > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416 > > >>> < > > >>> > > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416 > > >>>> > > >>>>>> > > >>>>> > > >>>>> > > >>> > > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458 > > >>> < > > >>> > > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458 > > >>>> > > >>>>> < > > >>>>> > > >>> > > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458 > > >>> < > > >>> > > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458 > > >>>> > > >>>>>> > > >>>>> > > >>>>> Sasha > > >>>>> > > >>>>>> On Apr 25, 2022, at 8:35 AM, Li Jin <ice.xell...@gmail.com> wrote: > > >>>>>> > > >>>>>> Hello! > > >>>>>> > > >>>>>> I am reading the use of TaskScheduler inside C++ compute code > > (reading > > >>>>> hash > > >>>>>> join) and have some questions about it, in particular: > > >>>>>> > > >>>>>> (1) What the purpose of SchedulerTaskCallback defined here: > > >>>>>> > > >>>>> > > >>> > > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join_node.cc#L428 > > >>>>>> (My guess is that the caller of TaskScheduler::StartTaskGroup needs > > to > > >>>>>> provide an implementation of a task executor, and the implementation > > >>> of > > >>>>>> SchedulerTaskCallback inside hash_join_node.cc is just a vanillar > > >>>>>> implementation) > > >>>>>> > > >>>>>> (2) When would this task context not have an executor? > > >>>>>> > > >>>>> > > >>> > > https://github.com/apache/arrow/blob/master/cpp/src/arrow/compute/exec/hash_join_node.cc#L581 > > >>>>>> > > >>>>>> (3) What's the difference between TaskImpl and > > >>> TaskGroupContinuationImpl > > >>>>> in > > >>>>>> TaskScheduler::RegisterTaskGroup? And how would one normally define > > >>>>>> TaskGroupContinuationImpl? > > >>>>>> > > >>>>>> Sorry I am still learning the Arrow compute internals and appreciate > > >>> help > > >>>>>> on understanding these. > > >>>>>> > > >>>>>> Li > > >>> > > >>> > > > >