Thanks Rahul. I assigned both BEAM-6114 <https://jira.apache.org/jira/browse/BEAM-6114>(split join multiple rel for different join implementation) and BEAM-7835 <https://jira.apache.org/jira/browse/BEAM-7835> (add the join you want to support) to you.
-Rui On Fri, Jul 26, 2019 at 6:04 PM rahul patwari <[email protected]> wrote: > Thanks for your detailed explanation Rui. > > Like you said, the triggers for the PCollections should be compatible with > "Slowly Changing Lookup Cache" pattern. > > Rui, If this feature makes sense, can you please create a JIRA for it. > > I will start working on splitting BeamJoinRel.java to specific > implementations with SQL planner rules. I will also implement the "Slowly > Changing Lookup Cache" pattern with SQL planner rules. > > Thanks, > Rahul > > On Sat 27 Jul, 2019, 1:58 AM Rui Wang, <[email protected]> wrote: > >> >> >>> PCollection<Row> mainStream = ... >>> *PCollectionView<Map<K, Iterable<V>>>* lookupStream = ... // Note: >>> PCollectionView not PCollection. I have referred to PCollection before. And >>> *PCollectionView >>> should be of type Multimap*, to perform SideinputJoin. >>> PCollectionTuple tuple = PCollectionTuple.of(new TupleTag("MainTable"), >>> mainStream)).and(new TupleTag("LookupTable"), lookupStream); >>> //PCollectionTuple has to be enhanced to take PCollectionView also as an >>> argument. >>> tuple.apply(SqlTransform.of("MainTable JOIN LookupTable")); >>> >> and in BeamJoinRel.java, when Join has to be performed on a *PCollection* >>> and a *PCollectionView*(instanceof check), SideInputJoin will be >>> applied. >>> >> >> Yes, I am thinking something similar to it. >> >> >> >>> I think that performing SideInputJoin on two unbounded PCollections with >>> different WindowFn(and satisfying the criteria for "Slowly Changing Lookup >>> Cache Pattern") is relatively straight forward if we take *PCollection* >>> itself as an argument for LookupTable in PCollectionTuple. >>> >> I think it's a hack in BeamJoinRel to check WindowFn and perform >> SideInput when one side is unbounded non-global windowing, another side is >> unbounded global windowing (and likely you need also check triggers?). For >> SQL, if you really want to do it, you should do it by planner rules to >> match exactly the case you want to support and decouple this join >> implementation from BeamJoinRel. >> >> Even current BeamJoinRel is too large and we should split it to different >> JoinRel to match different plans. >> >> >> >>> The conversion of PCollection to PCollectionView is hidden for the user >>> in this case(Which will be performed internally by SideInputJoin). >>> Moreover, if the user wants to perform some SQL Aggregations on >>> "lookupStream" before performing Join with "mainStream"(Multiple SQL >>> Queries separated by ";"), it is possible in this case, as the >>> "lookupStream" is a PCollection. But, it is not possible if the >>> "lookupStream" is a PCollectionView. >>> >> It's true that PCollectionView will limit further SQL operations. The >> workaround is do those operations by java before using SqlTransform, and >> within SqlTransfrom, start with the Join. >> >> >> So if your use case is support a general SQL operations on two unbounded >> PCollections but with a special need that to perform a SideInput join for >> these two unbounded PColleciton with a special WindowFn setting (maybe even >> trigger) checking, the best way then is to define SQL plan rules and have a >> separate Rel implementation. >> >> >> >> -Rui >> >> >> >> >>> Regards, >>> Rahul >>> >>> On Fri, Jul 26, 2019 at 9:19 AM Rui Wang <[email protected]> wrote: >>> >>>> I see. >>>> >>>> Actually I was still referring to make "LookupStream" as >>>> PCollectionView to perform sideinput join, which then doesn't have mismatch >>>> WindowFn problem. Otherwise, we shouldn't check special case of WindowFn to >>>> decide if perform a sideinput join for two unbounded PCollection when their >>>> WindowFn does not match. >>>> >>>> And "data completeness" really means is sideinput is triggered so it >>>> could change, and then the question is when sideinput is changed, should we >>>> refine previous data? It becomes harder to reason at this moment. >>>> >>>> >>>> Rui >>>> >>>> On Thu, Jul 25, 2019 at 6:17 PM rahul patwari < >>>> [email protected]> wrote: >>>> >>>>> "*In terms of Join schematic, I think it's hard to reason data >>>>> completeness since one side of the join is changing*" >>>>> - As it is possible to apply [Global Windows with Non-Default Trigger] >>>>> to Unbounded Data Source, say, Kafka, to distinguish this Kafka >>>>> PCollection >>>>> from "Slowly Changing lookup cache" Unbounded PCollection, If we can >>>>> check >>>>> the condition that one of the PCollection being Joined have WindowFn as >>>>> [Global Windows with Trigger Repeatedly.forever(AfterProcessingTime. >>>>> pastFirstElementInPane())] is it sufficient to perform the Join of >>>>> "MainStream" and this "LookupStream"? >>>>> >>>>> In other words, I mean to say that instead of directly throwing >>>>> Exception >>>>> <https://github.com/apache/beam/blob/f03b6ba12e7c0a1005504612cc6067eebec9ffe8/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java#L359> >>>>> when >>>>> Joining two Unbounded PCollections with different WindowFns, If we can >>>>> ensure that >>>>> MainStream: one side of the join is Unbounded with WindowFn as >>>>> [Non-Global Windows with DefaultTrigger] and >>>>> LookupStream: the other side of the Join is a "Slowly Changing Lookup >>>>> Cache"[Global Windows with Repeatedly.forever(AfterProcessingTime. >>>>> pastFirstElementInPane()) Trigger], >>>>> we can directly perform a SideInputJoin. >>>>> >>>>> Will we have "data completeness" problem even in "Slowly Changing >>>>> lookup Cache Pattern"? >>>>> >>>>> On Fri, Jul 26, 2019 at 2:51 AM Rui Wang <[email protected]> wrote: >>>>> >>>>>> To be more clear, I think it's useful if we can achieve the following >>>>>> that you wrote >>>>>> >>>>>> PCollection mainStream = ...; >>>>>> PCollection lookupStream = ...; >>>>>> PCollectionTuple tuple = PCollectionTuple.of(new >>>>>> TupleTag("MainTable"), new TupleTag("LookupTable")); >>>>>> tuple.apply(SqlTransform.query("MainTable JOIN LookupTable")); >>>>>> >>>>>> -Rui >>>>>> >>>>>> On Thu, Jul 25, 2019 at 1:56 PM Rui Wang <[email protected]> wrote: >>>>>> >>>>>>> Hi Rahul, thanks for your detailed writeup. It pretty much >>>>>>> summarizes the slow changing table join problem. >>>>>>> >>>>>>> To your question: "Can we implement SideInputJoin for this case", >>>>>>> there are two perspectives. >>>>>>> >>>>>>> In terms of implementing the slowing changing lookup cache pattern >>>>>>> <https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs> >>>>>>> in >>>>>>> BeamSQL, such sidinput join can be done that way. At least it worth >>>>>>> exploring it until we identify blockers. I also think this pattern is >>>>>>> already useful to users. >>>>>>> >>>>>>> In terms of Join schematic, I think it's hard to reason data >>>>>>> completeness since one side of join is changing. >>>>>>> >>>>>>> -Rui >>>>>>> >>>>>>> >>>>>>> On Thu, Jul 25, 2019 at 12:55 PM rahul patwari < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Hi Kenn, >>>>>>>> >>>>>>>> If we consider the following two *Unbounded* PCollections: >>>>>>>> - PCollection1 => [*Non-Global* Window with Default Trigger] >>>>>>>> - PCollection2 => [Global Window with *Non-Default* Trigger] :) >>>>>>>> coincidentally turned out to be the opposite >>>>>>>> >>>>>>>> Joining these two PCollections in BeamSql currently is not possible >>>>>>>> because of https://jira.apache.org/jira/browse/BEAM-3345(WindowFn >>>>>>>> Mismatch) >>>>>>>> But in this case, PCollection1 can be joined with PCollection2 >>>>>>>> using SideInputJoin ( >>>>>>>> https://github.com/apache/beam/blob/c7911043510a266078a3dc8faef7a1dbe1f598c5/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java#L456), >>>>>>>> which is being done for Joining an Unbounded PCollection with Bounded >>>>>>>> PCollection. I am thinking that Beam can guarantee it joins all >>>>>>>> input elements once per window for this case. >>>>>>>> The result of the join might be fuzzy for the window when the >>>>>>>> Trigger for PCollection2 fires and sideinput gets loaded into Memory. >>>>>>>> >>>>>>>> PCollection2 can be considered as Slowly Changing Lookup Cache and >>>>>>>> BeamSql can support Pattern: >>>>>>>> https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs, >>>>>>>> which is currently not possible. >>>>>>>> I am working on https://jira.apache.org/jira/browse/BEAM-7758 for >>>>>>>> BeamSql to natively support PCollectionView so that BeamSql supports >>>>>>>> "Slowly Updating Global Window Sideinput Pattern" using SqlTransform's >>>>>>>> TableProvider. >>>>>>>> >>>>>>>> If we can support this, User will be able to do: >>>>>>>> PCollection mainStream = ...; >>>>>>>> PCollection lookupStream = ...; >>>>>>>> PCollectionTuple tuple = PCollectionTuple.of(new >>>>>>>> TupleTag("MainTable"), new TupleTag("LookupTable")); >>>>>>>> tuple.apply(SqlTransform.query("MainTable JOIN LookupTable")); >>>>>>>> >>>>>>>> Can we implement SideInputJoin for this case? >>>>>>>> I might be wrong in my understanding. Please let me know your >>>>>>>> thoughts. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Rahul >>>>>>>> >>>>>>>> On Thu, Jul 25, 2019 at 9:28 AM Kenneth Knowles <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> I think the best way to approach this is probably to have an >>>>>>>>> example SQL statement and to discuss what the relational semantics >>>>>>>>> should >>>>>>>>> be. >>>>>>>>> >>>>>>>>> Windowing is not really part of SQL (yet) and in a way it just >>>>>>>>> needs very minimal extensions. See >>>>>>>>> https://arxiv.org/abs/1905.12133. In this proposal for SQL, >>>>>>>>> windowed aggregation is explicitly be part of the GROUP BY operation, >>>>>>>>> where >>>>>>>>> you GROUP BY window columns that were added. So it is more explicit >>>>>>>>> than in >>>>>>>>> Beam. Relations do not have a WindowFn so there is no problem of them >>>>>>>>> being >>>>>>>>> incompatible. >>>>>>>>> >>>>>>>>> With Beam SQL there are basically two ways of windowing that work >>>>>>>>> totally differently: >>>>>>>>> >>>>>>>>> 1. SQL style windowing where you GROUP BY windows. This does not >>>>>>>>> use the input PCollection windowfn >>>>>>>>> 2. PCollection windowing where the SQL does not do any windowing - >>>>>>>>> this should apply the SQL expression to each window independently >>>>>>>>> >>>>>>>>> In order to support a hybrid of these, it might be: >>>>>>>>> >>>>>>>>> 3. SQL style windowing, where when a PCollection has window >>>>>>>>> assigned, the window columns are added before the SQL is applied. It >>>>>>>>> is a >>>>>>>>> bit strange but might enable your use. >>>>>>>>> >>>>>>>>> Kenn >>>>>>>>> >>>>>>>>> On Mon, Jul 22, 2019 at 10:39 AM rahul patwari < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> Beam currently doesn't support Join of Unbounded PCollections of >>>>>>>>>> different WindowFns ( >>>>>>>>>> https://beam.apache.org/documentation/programming-guide/#groupbykey-and-unbounded-pcollections >>>>>>>>>> ). >>>>>>>>>> >>>>>>>>>> BeamSql performs [Unbounded PCollection] JOIN [Bounded >>>>>>>>>> PCollection], by performing 'SideInputJoin' with Bounded PCollection >>>>>>>>>> as a >>>>>>>>>> SideInput. >>>>>>>>>> >>>>>>>>>> Can we support [Unbounded PCollection] JOIN [Unbounded >>>>>>>>>> PCollection], when one of the Unbounded PCollection has >>>>>>>>>> [GlobalWindows >>>>>>>>>> Applied with Non-Default Trigger(probably a slow-changing lookup >>>>>>>>>> cache >>>>>>>>>> https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs)] >>>>>>>>>> by performing 'SideInputJoin'? >>>>>>>>>> >>>>>>>>>> Regards, >>>>>>>>>> Rahul >>>>>>>>>> >>>>>>>>>
