I think this is doable.  I think we want to introduce the concept of a
batch index.  The scanner is then responsible for assigning a batch
index to each outgoing batch.  Some ExecNode's would reset or destroy
the batch index (for example, you cannot generally do an asof join
after a hash join unless you insert a sort node in between).  The sort
node can reinstate the batch index.

A OrderedExecNode would extend from ExecNode and use this batch index.
A simple implementation would be:

OrderedExecNode has two states, processing and idle.

* If a batch arrives and it is the next batch and the state is idle:
  Move state to processing and begin operating
  as many batches as possible until the batch
  queue does not contain the next batch
* If a batch arrives and it is not the next batch:
  Add the batch to the batch queue
* If a batch arrives and the state is not idle:
  Add the batch to the batch queue

The batch queue can be some kind of heap structure / priority queue
that will know whether the top item is "the next item".  "The next
item" is a well defined concept as long as the batch index is
monotonically increasing (e.g. this could introduce some complication
if the asof join is after a filter join which eliminates an entire
batch but we can always emit an empty batch).

I call the above a simple implementation because it runs serially and
so this could become a choke point for a multithreaded engine.  A more
complex implementation would change "process the batch" to
"synchronously increment counters and update shared state and then
launch a task that can run in parallel".  For example, in an as-of
join I think you could do better to avoid a serial choke point.

On Tue, Apr 26, 2022 at 9:54 AM Li Jin <ice.xell...@gmail.com> wrote:
>
> Hey thanks again for the reply!
>
> > I would suggest accumulating all batches just like in Hash Join
> This is something I intentionally try to avoid because asof join (and many
> other time series operations) can be performed in a streaming fashion to
> reduce memory footprint.
>
> > When you want to scale up to multiple threads, you will no longer be able
> to rely on any order because scheduling is generally pretty nondeterministic
> I think this depends on the implementation. If all the nodes receive and
> output ordered batches, inside each node they can still use multiple
> threads to compute the result. Extra work needs to be done to ensure the
> ordering of output data but I don't think it is infeasible to
> multi-threading a streaming algorithm.
>
> Maybe a broader questions is "What is a good way to implement a ExecNode
> that requires ordered inputs and produces ordered output and how does that
> fit with the current parallelization model of Arrow compute"
>
> To be honest I am not very familiar with the parallelization model. I have
> the vague impression that each ExecNode has its own processing thread and
> data is sent asynchronously between nodes (i.e., each node will queue the
> input batches instead of blocking the upstream node thread). Also I have
> the impression that some nodes do parallelizations internally, but I am not
> sure how they would output. (i.e., do they output batches / call
> InputReceived downstream nodes with multi thread)?
>
>
>
>
> On Tue, Apr 26, 2022 at 2:22 PM Sasha Krassovsky <krassovskysa...@gmail.com>
> wrote:
>
> > I would advise against relying on any specific ordering of batches coming
> > in. When you want to scale up to multiple threads, you will no longer be
> > able to rely on any order because scheduling is generally pretty
> > nondeterministic. I would suggest accumulating all batches just like in
> > Hash Join, and when InputFinished is called, sort them. I’d even suggest
> > not relying on input batches being sorted within themselves, so you’ll have
> > to implement a mini “sort node” (if you don’t have some other fancier data
> > structure for this join). The accumulation itself shouldn’t be a
> > performance hit either because the threads that would be processing the
> > join will continue processing the inputs to the join, so the overall
> > throughput shouldn’t be affected.
> >
> > After the sorting, you can kick off a task group that will compute the
> > results. One thing you’ll have to experiment with is how many tasks to
> > start: one for each pair of batches, or one for each left-side batch, or
> > one for each right-side batch. If it’s the first, it may be useful to
> > extend the TaskGroup interface to allow for two-dimensional task groups (so
> > that it would be easier to start a task for each pair).
> >
> > Sasha
> >
> > > On Apr 26, 2022, at 11:03 AM, Li Jin <ice.xell...@gmail.com> wrote:
> > >
> > >> In order to produce a output for a left batch, I would need to wait
> > until
> > > I received enough batches from the right tables to cover all potential
> > > matches (wait until I have seen right timestamps outside the matching
> > range)
> > > Add a bit more explanation, let's say the time range of the current left
> > > batch is (2020101, 20200201), in order to produce the join result for
> > this
> > > batch, I need to receive all data from the right tables from (-inf,
> > > 20200201) and I can know this once I have seen data from all right tables
> > > that have timestamp after 20200201 (since data arrives in time order).
> > But
> > > I do not need to receive all data in order to produce an output batch
> > > (unlike hash join).
> > >
> > > On Tue, Apr 26, 2022 at 1:59 PM Li Jin <ice.xell...@gmail.com> wrote:
> > >
> > >> Thanks both for the reply. To add a bit more context, I am trying to
> > >> implement an "asof join". Here I have one left table and n right table,
> > and
> > >> all batches arrive in time order.
> > >>
> > >> In order to produce a output for a left batch, I would need to wait
> > until
> > >> I received enough batches from the right tables to cover all potential
> > >> matches (wait until I have seen right timestamps outside the matching
> > range)
> > >>
> > >> From both replies it sounds like I should just do the check if I got
> > >> enough data in InputReceived function and do work there when I have
> > enough
> > >> data.
> > >>
> > >> However, one thing that I am not sure about is how does the
> > >> parallelization comes into play - it sounds like InputReceived could be
> > >> called by multiple thread of the same input node for different batches?
> > >> Currently I have just trying to get a baseline implementation that has
> > one
> > >> thread doing the join so if InputReceived is called by multiple thread I
> > >> might ended up blocking other threads unnecessarily.
> > >>
> > >> If I have a dedicate thread/executor that does the join and
> > InputReceived
> > >> just queue the batches and return immediately, I felt like it would be
> > more
> > >> efficient.
> > >>
> > >> Thoughts?
> > >>
> > >> Thanks,
> > >> Li
> > >>
> > >> On Mon, Apr 25, 2022 at 6:41 PM Sasha Krassovsky <
> > >> krassovskysa...@gmail.com> wrote:
> > >>
> > >>> If I understand correctly, on InputReceived you’ll be accumulating
> > >>> batches until you have enough to compute the next output? In that
> > case, you
> > >>> have two options: you can either just immediately compute it using the
> > same
> > >>> thread, or call the schedule_callback directly (not using the
> > scheduler). I
> > >>> think your pseudocode is correct - since whether or not you can output
> > the
> > >>> next batch can only change on InputReceived, that’s the only spot you
> > need
> > >>> to check. I think an elaboration of your pseudocode could be something
> > like:
> > >>>
> > >>> Status InputReceived(Batch b)
> > >>>    lock(accum_lock);
> > >>>    accum.push_back(b);
> > >>>    if(enough_inputs)
> > >>>        vector<Batch> batches = std::move(accum);
> > >>>        unlock(accum_lock);
> > >>>        compute_next_output(batches);
> > >>>    return Status::OK();
> > >>>
> > >>> Sasha
> > >>>
> > >>>> On Apr 25, 2022, at 3:29 PM, Li Jin <ice.xell...@gmail.com> wrote:
> > >>>>
> > >>>> Thanks! That's super helpful.
> > >>>>
> > >>>> A follow up question on TaskScheduler - What's the correct way to
> > >>> define a
> > >>>> task that "do work if input batches are ready, otherwise try later"?
> > >>>>
> > >>>> Sth like
> > >>>>
> > >>>> Status try_process():
> > >>>> if enough_inputs_to _produce_next_output:
> > >>>> compute_and_produce_next_output();
> > >>>> return Status::OK()
> > >>>> else:
> > >>>> # Is this right?
> > >>>> # Exit and try later
> > >>>> return Status::OK();
> > >>>>
> > >>>> If I register this function with TaskScheduler, I think it only gets
> > run
> > >>>> once, so I think I might need to schedule the next task when inputs
> > are
> > >>> not
> > >>>> ready but I am not sure of the best way to do that. Any suggestions?
> > >>>>
> > >>>> Li
> > >>>>
> > >>>> On Mon, Apr 25, 2022 at 6:18 PM Sasha Krassovsky <
> > >>> krassovskysa...@gmail.com <mailto:krassovskysa...@gmail.com>>
> > >>>> wrote:
> > >>>>
> > >>>>> Hi Li,
> > >>>>> I’ll answer the questions in order:
> > >>>>>
> > >>>>> 1. Your guess is correct! The Hash Join may be used standalone
> > (mostly
> > >>> in
> > >>>>> testing or benchmarking for now) or as part of the ExecNode. The
> > >>> ExecNode
> > >>>>> will pass the task to the Executor to be scheduled, or will run it
> > >>>>> immediately if it’s in sync mode (i.e. no executor). Our Hash Join
> > >>>>> benchmark uses OpenMP to schedule things, and passes a lambda that
> > does
> > >>>>> OpenMP things to the HashJoin.
> > >>>>>
> > >>>>> 2. We might not have an executor if we want to execute synchronously.
> > >>> This
> > >>>>> is set during construction of the ExecContext, which is given to the
> > >>>>> ExecPlan during creation. If the ExecContext has a nullptr Executor,
> > >>> then
> > >>>>> we are in async mode, otherwise we use the Executor to schedule. One
> > >>>>> confusing thing is that we also have a SerialExecutor - I’m actually
> > >>> not
> > >>>>> quite sure what the difference between using that and setting the
> > >>> Executor
> > >>>>> to nullptr is (might have something to do with testing?). @Weston
> > >>> probably
> > >>>>> knows
> > >>>>>
> > >>>>> 3. You can think of the TaskGroup as a “parallel for loop”. TaskImpl
> > is
> > >>>>> the function that implements the work that needs to be split up,
> > >>>>> TaskGroupContinuationImpl is what gets run after the for loop.
> > TaskImpl
> > >>>>> will receive the index of the task. If you’re familiar with OpenMP,
> > >>> it’s
> > >>>>> equivalent to this:
> > >>>>>
> > >>>>> #pragma omp parallel for
> > >>>>> for(int i = 0; i < 100; i++)
> > >>>>> TaskImpl(omp_get_thread_num(), i);
> > >>>>> TaskGroupContinuationImpl();
> > >>>>>
> > >>>>> Examples of the two are here:
> > >>>>>
> > >>>>>
> > >>>
> > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416
> > >>>>> <
> > >>>>>
> > >>>
> > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416
> > >>> <
> > >>>
> > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416
> > >>>>
> > >>>>>>
> > >>>>>
> > >>>>>
> > >>>
> > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458
> > >>> <
> > >>>
> > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458
> > >>>>
> > >>>>> <
> > >>>>>
> > >>>
> > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458
> > >>> <
> > >>>
> > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458
> > >>>>
> > >>>>>>
> > >>>>>
> > >>>>> Sasha
> > >>>>>
> > >>>>>> On Apr 25, 2022, at 8:35 AM, Li Jin <ice.xell...@gmail.com> wrote:
> > >>>>>>
> > >>>>>> Hello!
> > >>>>>>
> > >>>>>> I am reading the use of TaskScheduler inside C++ compute code
> > (reading
> > >>>>> hash
> > >>>>>> join) and have some questions about it, in particular:
> > >>>>>>
> > >>>>>> (1) What the purpose of SchedulerTaskCallback defined here:
> > >>>>>>
> > >>>>>
> > >>>
> > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join_node.cc#L428
> > >>>>>> (My guess is that the caller of TaskScheduler::StartTaskGroup needs
> > to
> > >>>>>> provide an implementation of a task executor, and the implementation
> > >>> of
> > >>>>>> SchedulerTaskCallback inside hash_join_node.cc is just a vanillar
> > >>>>>> implementation)
> > >>>>>>
> > >>>>>> (2) When would this task context not have an executor?
> > >>>>>>
> > >>>>>
> > >>>
> > https://github.com/apache/arrow/blob/master/cpp/src/arrow/compute/exec/hash_join_node.cc#L581
> > >>>>>>
> > >>>>>> (3) What's the difference between TaskImpl and
> > >>> TaskGroupContinuationImpl
> > >>>>> in
> > >>>>>> TaskScheduler::RegisterTaskGroup? And how would one normally define
> > >>>>>> TaskGroupContinuationImpl?
> > >>>>>>
> > >>>>>> Sorry I am still learning the Arrow compute internals and appreciate
> > >>> help
> > >>>>>> on understanding these.
> > >>>>>>
> > >>>>>> Li
> > >>>
> > >>>
> >
> >

Reply via email to