> In order to produce a output for a left batch, I would need to wait until
I received enough batches from the right tables to cover all potential
matches (wait until I have seen right timestamps outside the matching range)
Add a bit more explanation, let's say the time range of the current left
batch is (2020101, 20200201), in order to produce the join result for this
batch, I need to receive all data from the right tables from (-inf,
20200201) and I can know this once I have seen data from all right tables
that have timestamp after 20200201 (since data arrives in time order). But
I do not need to receive all data in order to produce an output batch
(unlike hash join).

On Tue, Apr 26, 2022 at 1:59 PM Li Jin <ice.xell...@gmail.com> wrote:

> Thanks both for the reply. To add a bit more context, I am trying to
> implement an "asof join". Here I have one left table and n right table, and
> all batches arrive in time order.
>
> In order to produce a output for a left batch, I would need to wait until
> I received enough batches from the right tables to cover all potential
> matches (wait until I have seen right timestamps outside the matching range)
>
> From both replies it sounds like I should just do the check if I got
> enough data in InputReceived function and do work there when I have enough
> data.
>
> However, one thing that I am not sure about is how does the
> parallelization comes into play - it sounds like InputReceived could be
> called by multiple thread of the same input node for different batches?
> Currently I have just trying to get a baseline implementation that has one
> thread doing the join so if InputReceived is called by multiple thread I
> might ended up blocking other threads unnecessarily.
>
> If I have a dedicate thread/executor that does the join and InputReceived
> just queue the batches and return immediately, I felt like it would be more
> efficient.
>
> Thoughts?
>
> Thanks,
> Li
>
> On Mon, Apr 25, 2022 at 6:41 PM Sasha Krassovsky <
> krassovskysa...@gmail.com> wrote:
>
>> If I understand correctly, on InputReceived you’ll be accumulating
>> batches until you have enough to compute the next output? In that case, you
>> have two options: you can either just immediately compute it using the same
>> thread, or call the schedule_callback directly (not using the scheduler). I
>> think your pseudocode is correct - since whether or not you can output the
>> next batch can only change on InputReceived, that’s the only spot you need
>> to check. I think an elaboration of your pseudocode could be something like:
>>
>> Status InputReceived(Batch b)
>>     lock(accum_lock);
>>     accum.push_back(b);
>>     if(enough_inputs)
>>         vector<Batch> batches = std::move(accum);
>>         unlock(accum_lock);
>>         compute_next_output(batches);
>>     return Status::OK();
>>
>> Sasha
>>
>> > On Apr 25, 2022, at 3:29 PM, Li Jin <ice.xell...@gmail.com> wrote:
>> >
>> > Thanks! That's super helpful.
>> >
>> > A follow up question on TaskScheduler - What's the correct way to
>> define a
>> > task that "do work if input batches are ready, otherwise try later"?
>> >
>> > Sth like
>> >
>> > Status try_process():
>> > if enough_inputs_to _produce_next_output:
>> > compute_and_produce_next_output();
>> > return Status::OK()
>> > else:
>> > # Is this right?
>> > # Exit and try later
>> > return Status::OK();
>> >
>> > If I register this function with TaskScheduler, I think it only gets run
>> > once, so I think I might need to schedule the next task when inputs are
>> not
>> > ready but I am not sure of the best way to do that. Any suggestions?
>> >
>> > Li
>> >
>> > On Mon, Apr 25, 2022 at 6:18 PM Sasha Krassovsky <
>> krassovskysa...@gmail.com <mailto:krassovskysa...@gmail.com>>
>> > wrote:
>> >
>> >> Hi Li,
>> >> I’ll answer the questions in order:
>> >>
>> >> 1. Your guess is correct! The Hash Join may be used standalone (mostly
>> in
>> >> testing or benchmarking for now) or as part of the ExecNode. The
>> ExecNode
>> >> will pass the task to the Executor to be scheduled, or will run it
>> >> immediately if it’s in sync mode (i.e. no executor). Our Hash Join
>> >> benchmark uses OpenMP to schedule things, and passes a lambda that does
>> >> OpenMP things to the HashJoin.
>> >>
>> >> 2. We might not have an executor if we want to execute synchronously.
>> This
>> >> is set during construction of the ExecContext, which is given to the
>> >> ExecPlan during creation. If the ExecContext has a nullptr Executor,
>> then
>> >> we are in async mode, otherwise we use the Executor to schedule. One
>> >> confusing thing is that we also have a SerialExecutor - I’m actually
>> not
>> >> quite sure what the difference between using that and setting the
>> Executor
>> >> to nullptr is (might have something to do with testing?). @Weston
>> probably
>> >> knows
>> >>
>> >> 3. You can think of the TaskGroup as a “parallel for loop”. TaskImpl is
>> >> the function that implements the work that needs to be split up,
>> >> TaskGroupContinuationImpl is what gets run after the for loop. TaskImpl
>> >> will receive the index of the task. If you’re familiar with OpenMP,
>> it’s
>> >> equivalent to this:
>> >>
>> >> #pragma omp parallel for
>> >> for(int i = 0; i < 100; i++)
>> >> TaskImpl(omp_get_thread_num(), i);
>> >> TaskGroupContinuationImpl();
>> >>
>> >> Examples of the two are here:
>> >>
>> >>
>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416
>> >> <
>> >>
>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416
>> <
>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416
>> >
>> >>>
>> >>
>> >>
>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458
>> <
>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458
>> >
>> >> <
>> >>
>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458
>> <
>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458
>> >
>> >>>
>> >>
>> >> Sasha
>> >>
>> >>> On Apr 25, 2022, at 8:35 AM, Li Jin <ice.xell...@gmail.com> wrote:
>> >>>
>> >>> Hello!
>> >>>
>> >>> I am reading the use of TaskScheduler inside C++ compute code (reading
>> >> hash
>> >>> join) and have some questions about it, in particular:
>> >>>
>> >>> (1) What the purpose of SchedulerTaskCallback defined here:
>> >>>
>> >>
>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join_node.cc#L428
>> >>> (My guess is that the caller of TaskScheduler::StartTaskGroup needs to
>> >>> provide an implementation of a task executor, and the implementation
>> of
>> >>> SchedulerTaskCallback inside hash_join_node.cc is just a vanillar
>> >>> implementation)
>> >>>
>> >>> (2) When would this task context not have an executor?
>> >>>
>> >>
>> https://github.com/apache/arrow/blob/master/cpp/src/arrow/compute/exec/hash_join_node.cc#L581
>> >>>
>> >>> (3) What's the difference between TaskImpl and
>> TaskGroupContinuationImpl
>> >> in
>> >>> TaskScheduler::RegisterTaskGroup? And how would one normally define
>> >>> TaskGroupContinuationImpl?
>> >>>
>> >>> Sorry I am still learning the Arrow compute internals and appreciate
>> help
>> >>> on understanding these.
>> >>>
>> >>> Li
>>
>>

Reply via email to