Thanks Rahul.

I assigned both BEAM-6114 <https://jira.apache.org/jira/browse/BEAM-6114>(split
join multiple rel for different join implementation) and BEAM-7835
<https://jira.apache.org/jira/browse/BEAM-7835> (add the join you want to
support) to you.


-Rui

On Fri, Jul 26, 2019 at 6:04 PM rahul patwari <[email protected]>
wrote:

> Thanks for your detailed explanation Rui.
>
> Like you said, the triggers for the PCollections should be compatible with
> "Slowly Changing Lookup Cache" pattern.
>
> Rui, If this feature makes sense, can you please create a JIRA for it.
>
> I will start working on splitting BeamJoinRel.java to specific
> implementations with SQL planner rules. I will also implement the "Slowly
> Changing Lookup Cache" pattern with SQL planner rules.
>
> Thanks,
> Rahul
>
> On Sat 27 Jul, 2019, 1:58 AM Rui Wang, <[email protected]> wrote:
>
>>
>>
>>> PCollection<Row> mainStream = ...
>>> *PCollectionView<Map<K, Iterable<V>>>* lookupStream = ...      // Note:
>>> PCollectionView not PCollection. I have referred to PCollection before. And 
>>> *PCollectionView
>>> should be of type Multimap*, to perform SideinputJoin.
>>> PCollectionTuple tuple = PCollectionTuple.of(new TupleTag("MainTable"),
>>> mainStream)).and(new TupleTag("LookupTable"), lookupStream);
>>> //PCollectionTuple has to be enhanced to take PCollectionView also as an
>>> argument.
>>> tuple.apply(SqlTransform.of("MainTable JOIN LookupTable"));
>>>
>> and in BeamJoinRel.java, when Join has to be performed on a *PCollection*
>>> and a *PCollectionView*(instanceof check), SideInputJoin will be
>>> applied.
>>>
>>
>> Yes, I am thinking something similar to it.
>>
>>
>>
>>> I think that performing SideInputJoin on two unbounded PCollections with
>>> different WindowFn(and satisfying the criteria for "Slowly Changing Lookup
>>> Cache Pattern") is relatively straight forward if we take *PCollection*
>>> itself as an argument for LookupTable in PCollectionTuple.
>>>
>> I think it's a hack in BeamJoinRel to check WindowFn and perform
>> SideInput when one side is unbounded non-global windowing, another side is
>> unbounded global windowing (and likely you need also check triggers?). For
>> SQL, if you really want to do it, you should do it by planner rules to
>> match exactly the case you want to support and decouple this join
>> implementation from BeamJoinRel.
>>
>> Even current BeamJoinRel is too large and we should split it to different
>> JoinRel to match different plans.
>>
>>
>>
>>> The conversion of PCollection to PCollectionView is hidden for the user
>>> in this case(Which will be performed internally by SideInputJoin).
>>> Moreover, if the user wants to perform some SQL Aggregations on
>>> "lookupStream" before performing Join with "mainStream"(Multiple SQL
>>> Queries separated by ";"), it is possible in this case, as the
>>> "lookupStream" is a PCollection. But, it is not possible if the
>>> "lookupStream" is a PCollectionView.
>>>
>> It's true that PCollectionView will limit further SQL operations. The
>> workaround is do those operations by java before using SqlTransform, and
>> within SqlTransfrom, start with the Join.
>>
>>
>> So if your use case is support a general SQL operations on two unbounded
>> PCollections but with a special need that to perform a SideInput join for
>> these two unbounded PColleciton with a special WindowFn setting (maybe even
>> trigger) checking, the best way then is to define SQL plan rules and have a
>> separate Rel implementation.
>>
>>
>>
>> -Rui
>>
>>
>>
>>
>>> Regards,
>>> Rahul
>>>
>>> On Fri, Jul 26, 2019 at 9:19 AM Rui Wang <[email protected]> wrote:
>>>
>>>> I see.
>>>>
>>>> Actually I was still referring to make "LookupStream" as
>>>> PCollectionView to perform sideinput join, which then doesn't have mismatch
>>>> WindowFn problem. Otherwise, we shouldn't check special case of WindowFn to
>>>> decide if perform a sideinput join for two unbounded PCollection when their
>>>> WindowFn does not match.
>>>>
>>>> And "data completeness" really means is sideinput is triggered so it
>>>> could change, and then the question is when sideinput is changed, should we
>>>> refine previous data? It becomes harder to reason at this moment.
>>>>
>>>>
>>>> Rui
>>>>
>>>> On Thu, Jul 25, 2019 at 6:17 PM rahul patwari <
>>>> [email protected]> wrote:
>>>>
>>>>> "*In terms of Join schematic, I think it's hard to reason data
>>>>> completeness since one side of the join is changing*"
>>>>> - As it is possible to apply [Global Windows with Non-Default Trigger]
>>>>> to Unbounded Data Source, say, Kafka, to distinguish this Kafka 
>>>>> PCollection
>>>>> from "Slowly Changing lookup cache" Unbounded PCollection,  If we can 
>>>>> check
>>>>> the condition that one of the PCollection being Joined have WindowFn as
>>>>> [Global Windows with Trigger Repeatedly.forever(AfterProcessingTime.
>>>>> pastFirstElementInPane())] is it sufficient to perform the Join of
>>>>> "MainStream" and this "LookupStream"?
>>>>>
>>>>> In other words, I mean to say that instead of directly throwing
>>>>> Exception
>>>>> <https://github.com/apache/beam/blob/f03b6ba12e7c0a1005504612cc6067eebec9ffe8/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java#L359>
>>>>>  when
>>>>> Joining two Unbounded PCollections with different WindowFns, If we can
>>>>> ensure that
>>>>> MainStream: one side of the join is Unbounded with WindowFn as
>>>>> [Non-Global Windows with DefaultTrigger] and
>>>>> LookupStream: the other side of the Join is a "Slowly Changing Lookup
>>>>> Cache"[Global Windows with Repeatedly.forever(AfterProcessingTime.
>>>>> pastFirstElementInPane()) Trigger],
>>>>> we can directly perform a SideInputJoin.
>>>>>
>>>>> Will we have "data completeness" problem even in "Slowly Changing
>>>>> lookup Cache Pattern"?
>>>>>
>>>>> On Fri, Jul 26, 2019 at 2:51 AM Rui Wang <[email protected]> wrote:
>>>>>
>>>>>> To be more clear, I think it's useful if we can achieve the following
>>>>>> that you wrote
>>>>>>
>>>>>> PCollection mainStream = ...;
>>>>>> PCollection lookupStream = ...;
>>>>>> PCollectionTuple tuple = PCollectionTuple.of(new
>>>>>> TupleTag("MainTable"), new TupleTag("LookupTable"));
>>>>>> tuple.apply(SqlTransform.query("MainTable JOIN LookupTable"));
>>>>>>
>>>>>> -Rui
>>>>>>
>>>>>> On Thu, Jul 25, 2019 at 1:56 PM Rui Wang <[email protected]> wrote:
>>>>>>
>>>>>>> Hi Rahul, thanks for your detailed writeup. It pretty much
>>>>>>> summarizes the slow changing table join problem.
>>>>>>>
>>>>>>> To your question: "Can we implement SideInputJoin for this case",
>>>>>>> there are two perspectives.
>>>>>>>
>>>>>>> In terms of implementing the slowing changing lookup cache pattern
>>>>>>> <https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs>
>>>>>>>  in
>>>>>>> BeamSQL, such sidinput join can be done that way. At least it worth
>>>>>>> exploring it until we identify blockers. I also think this pattern is
>>>>>>> already useful to users.
>>>>>>>
>>>>>>> In terms of Join schematic, I think it's hard to reason data
>>>>>>> completeness since one side of join is changing.
>>>>>>>
>>>>>>> -Rui
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jul 25, 2019 at 12:55 PM rahul patwari <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> Hi Kenn,
>>>>>>>>
>>>>>>>> If we consider the following two *Unbounded* PCollections:
>>>>>>>> - PCollection1 => [*Non-Global* Window with Default Trigger]
>>>>>>>> - PCollection2 => [Global Window with *Non-Default* Trigger] :)
>>>>>>>> coincidentally turned out to be the opposite
>>>>>>>>
>>>>>>>> Joining these two PCollections in BeamSql currently is not possible
>>>>>>>> because of https://jira.apache.org/jira/browse/BEAM-3345(WindowFn
>>>>>>>> Mismatch)
>>>>>>>> But in this case, PCollection1 can be joined with PCollection2
>>>>>>>> using SideInputJoin (
>>>>>>>> https://github.com/apache/beam/blob/c7911043510a266078a3dc8faef7a1dbe1f598c5/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java#L456),
>>>>>>>> which is being done for Joining an Unbounded PCollection with Bounded
>>>>>>>> PCollection. I am thinking that Beam can guarantee it joins all
>>>>>>>> input elements once per window for this case.
>>>>>>>> The result of the join might be fuzzy for the window when the
>>>>>>>> Trigger for PCollection2 fires and sideinput gets loaded into Memory.
>>>>>>>>
>>>>>>>> PCollection2 can be considered as Slowly Changing Lookup Cache and
>>>>>>>> BeamSql can support Pattern:
>>>>>>>> https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs,
>>>>>>>> which is currently not possible.
>>>>>>>> I am working on https://jira.apache.org/jira/browse/BEAM-7758 for
>>>>>>>> BeamSql to natively support PCollectionView so that BeamSql supports
>>>>>>>> "Slowly Updating Global Window Sideinput Pattern" using SqlTransform's
>>>>>>>> TableProvider.
>>>>>>>>
>>>>>>>> If we can support this, User will be able to do:
>>>>>>>> PCollection mainStream = ...;
>>>>>>>> PCollection lookupStream = ...;
>>>>>>>> PCollectionTuple tuple = PCollectionTuple.of(new
>>>>>>>> TupleTag("MainTable"), new TupleTag("LookupTable"));
>>>>>>>> tuple.apply(SqlTransform.query("MainTable JOIN LookupTable"));
>>>>>>>>
>>>>>>>> Can we implement SideInputJoin for this case?
>>>>>>>> I might be wrong in my understanding. Please let me know your
>>>>>>>> thoughts.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Rahul
>>>>>>>>
>>>>>>>> On Thu, Jul 25, 2019 at 9:28 AM Kenneth Knowles <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I think the best way to approach this is probably to have an
>>>>>>>>> example SQL statement and to discuss what the relational semantics 
>>>>>>>>> should
>>>>>>>>> be.
>>>>>>>>>
>>>>>>>>> Windowing is not really part of SQL (yet) and in a way it just
>>>>>>>>> needs very minimal extensions. See
>>>>>>>>> https://arxiv.org/abs/1905.12133. In this proposal for SQL,
>>>>>>>>> windowed aggregation is explicitly be part of the GROUP BY operation, 
>>>>>>>>> where
>>>>>>>>> you GROUP BY window columns that were added. So it is more explicit 
>>>>>>>>> than in
>>>>>>>>> Beam. Relations do not have a WindowFn so there is no problem of them 
>>>>>>>>> being
>>>>>>>>> incompatible.
>>>>>>>>>
>>>>>>>>> With Beam SQL there are basically two ways of windowing that work
>>>>>>>>> totally differently:
>>>>>>>>>
>>>>>>>>> 1. SQL style windowing where you GROUP BY windows. This does not
>>>>>>>>> use the input PCollection windowfn
>>>>>>>>> 2. PCollection windowing where the SQL does not do any windowing -
>>>>>>>>> this should apply the SQL expression to each window independently
>>>>>>>>>
>>>>>>>>> In order to support a hybrid of these, it might be:
>>>>>>>>>
>>>>>>>>> 3. SQL style windowing, where when a PCollection has window
>>>>>>>>> assigned, the window columns are added before the SQL is applied. It 
>>>>>>>>> is a
>>>>>>>>> bit strange but might enable your use.
>>>>>>>>>
>>>>>>>>> Kenn
>>>>>>>>>
>>>>>>>>> On Mon, Jul 22, 2019 at 10:39 AM rahul patwari <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> Beam currently doesn't support Join of Unbounded PCollections of
>>>>>>>>>> different WindowFns (
>>>>>>>>>> https://beam.apache.org/documentation/programming-guide/#groupbykey-and-unbounded-pcollections
>>>>>>>>>> ).
>>>>>>>>>>
>>>>>>>>>> BeamSql performs [Unbounded PCollection] JOIN [Bounded
>>>>>>>>>> PCollection], by performing 'SideInputJoin' with Bounded PCollection 
>>>>>>>>>> as a
>>>>>>>>>> SideInput.
>>>>>>>>>>
>>>>>>>>>> Can we support [Unbounded PCollection] JOIN [Unbounded
>>>>>>>>>> PCollection], when one of the Unbounded PCollection has 
>>>>>>>>>> [GlobalWindows
>>>>>>>>>> Applied with Non-Default Trigger(probably a slow-changing lookup 
>>>>>>>>>> cache
>>>>>>>>>> https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs)]
>>>>>>>>>> by performing 'SideInputJoin'?
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Rahul
>>>>>>>>>>
>>>>>>>>>

Reply via email to