I agree that in batch the unbounded disorder will prevent the approach in (1) unless the input is sorted. In streaming it works well using watermarks. This is not a reason to reject (1).
(1.1) Instead it might make sense to have an annotation that is a hint for *batch* to timesort the input to a stateful DoFn, but it will be ignored in streaming. The DoFn will still be written to be robust to disorder. (1.2) Most streaming use cases for stateful DoFn probably work with normal stream disorder but not with total disorder. So it probably almost always makes sense to timesort the input to any stateful DoFn in batch. I had forgotten that the DataflowRunner already does this [1]. It also always sort in a batch GBK shuffle to efficiently process windowing [2]. Certainly sorting in these cases should be done by the runner. (2.1) It makes sense to expose a generic "GroupByKeyAndSortValues" operation. There have been some discussions. I don't think it applies, necessarily. In batch you wouldn't necessarily need sorting for this specific use case. You need to evaluate a condition like "WHERE abs(x.timestamp - y.timestamp) < n". There are other join techniques that allow you to partition and join data with conditions like this. The key insight is that for bounded data you can treat the timestamp field as just another data field and this is just a condition without any special properties. So I think sorting in batch is useful for the *generic* use case of a stateful DoFn but probably not needed or best for specifically for this join. So finally (3) is not necessary because there are many options. But, meta-point: a couple of times it has been suggested that no one objects to this and we can go ahead. Normally I figure that if something is useful and doesn't break anything it is OK to have somewhere, as long as maintenance burden is low. But in this case, a key advantage of Beam's model is that watermarks allow lower latency and drops less data than previous approaches like a sort buffer. So I do think a much higher burden of proof applies for something that abandons those benefits, especially if it makes it easy for users to do it when they may not even need to. Kenn [1] https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java#L266 [2] https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BatchGroupAlsoByWindowViaIteratorsFn.java On Thu, Nov 28, 2019 at 4:19 AM David Morávek <[email protected]> wrote: > Agreed with Jan. This kind of use case requires having incoming elements > ordered by timestamp. Only effective solution is to delegate sorting to the > runner, which is currently impossible. Introducing an "annotation" that > would guarantee event time order looks like a nice clean to solve this. :+1: > > I'd love to see this effort moving forward, are there any objections > against this I'm not aware of (looking at the previous discussions I didn't > find any)? > > D. > > On Thu, Nov 28, 2019 at 10:46 AM Jan Lukavský <[email protected]> wrote: > >> Hi Reza, >> On 11/28/19 8:16 AM, Reza Rokni wrote: >> >> Hi, >> >> With regards to the processing needed for sort: >> The first naive implementation of the prototype did a read and sort for >> every Timer that fired ( timers was set to fire for every LHS element >> timestamp, a property of the use case we was looking at). This worked but >> was very slow as you would expect, so we changed things to make use of >> bundle boundaries as a way to reduce the number of sorts, by storing the >> sorted list into a static map ( Key-Window as key) for the duration of the >> bundle. It was very effective for the use case, but added a lot of >> technical debt and hard to figure out potential bugs... >> >> Note that when you push the sorting from user code to runner (even for >> streaming), then a much more efficient implementation appears, because you >> can read and sort all elements from the sort buffer *up to the input >> watermark*. This is much bigger "hop" the per element and therefore is very >> efficient even with no other optimizations in place. The problem is that in >> user code, the actual input watermark is unknown (yes, that could be >> changed, we can add the value of input watermark to OnTimerContext). >> >> >> With regards to memory needs: >> In our use case, while there was a lot of elements, the elements were >> small in size and even in batch mode we could process all of the data >> without OOM. But we would want a generalized solution not to have to rely >> on this property when in batch mode of course. >> >> +1 >> >> >> Just a thought Jan as a temporary solution, for your use case, would >> stripping down the element to just timestamp & joinkey allow the data to >> fit into memory for the batch processing mode? It would require more work >> afterwards to add back the other properties ( a lhs and rhs pass I think..) >> , which could make it prohibitive...? >> >> Actually there are workarounds, yes. I'm looking for a generic solution, >> and because I have implemented the @RequiresTimeSortedInput annotation and >> I'm using it, I actually don't need any workarounds. :-) I just need a >> consensus to add this to master, because I don't (obviously) want to keep >> and maintain that outside Beam. >> >> Jan >> >> >> Cheers >> Reza >> >> >> >> >> >> >> >> >> On Thu, 28 Nov 2019 at 14:12, Kenneth Knowles <[email protected]> wrote: >> >>> Yes, I am suggesting to add more intelligent state data structures for >>> just that sort of join. I tagged Reza because his work basically does it, >>> but explicitly pulls a BagState into memory and sorts it. We just need to >>> avoid that. It is the sort of thing that already exists in some engines so >>> there's proof of concept :-). Jan makes the good point that executing the >>> same join in batch you wouldn't use the same algorithm, because the >>> disorder will be unbounded. In Beam you'd want a PTransform that expands >>> differently based on whether the inputs are bounded or unbounded. >>> >>> Kenn >>> >>> On Tue, Nov 26, 2019 at 4:16 AM David Morávek <[email protected]> >>> wrote: >>> >>>> Yes, in batch case with long-term historical data, this would be O(n^2) >>>> as it basically a bubble sort. If you have large # of updates for a single >>>> key, this would be super expensive. >>>> >>>> Kenn, can this be re-implemented with your solution? >>>> >>>> On Tue, Nov 26, 2019 at 1:10 PM Jan Lukavský <[email protected]> wrote: >>>> >>>>> Functionally yes. But this straightforward solution is not working for >>>>> me for two main reasons: >>>>> >>>>> - it either blows state in batch case or the time complexity of the >>>>> sort would be O(n^2) (and reprocessing several years of dense time-series >>>>> data makes it a no go) >>>>> >>>>> - it is not reusable for different time-ordering needs, because the >>>>> logic implemented purely in user-space cannot be transferred to different >>>>> problem (there are two states needed, one for buffer, the other for >>>>> user-state) and extending DoFns does not work (cannot create abstract >>>>> SortedDoFn, because of the state annotation definitions) >>>>> >>>>> Jan >>>>> On 11/26/19 12:56 PM, David Morávek wrote: >>>>> >>>>> Hi, >>>>> >>>>> I think what Jan has in mind would look something like this >>>>> <https://gist.github.com/dmvk/3ea32eb36c6406fa72d70b9b1df1d878>, if >>>>> implemented in user code. Am I right? >>>>> >>>>> D. >>>>> >>>>> >>>>> On Tue, Nov 26, 2019 at 10:23 AM Jan Lukavský <[email protected]> wrote: >>>>> >>>>>> >>>>>> On 11/25/19 11:45 PM, Kenneth Knowles wrote: >>>>>> >>>>>> >>>>>> >>>>>> On Mon, Nov 25, 2019 at 1:56 PM Jan Lukavský <[email protected]> wrote: >>>>>> >>>>>>> Hi Rui, >>>>>>> >>>>>>> > Hi Kenn, you think stateful DoFn based join can emit joined rows >>>>>>> that never to be retracted because in stateful DoFn case joined rows >>>>>>> will >>>>>>> be controlled by timers and emit will be only once? If so I will agree >>>>>>> with >>>>>>> it. Generally speaking, if only emit once is the factor of needing >>>>>>> retraction or not. >>>>>>> >>>>>>> that would imply buffering elements up until watermark, then sorting >>>>>>> and so reduces to the option a) again, is that true? This also has to >>>>>>> deal >>>>>>> with allowed lateness, that would mean, that with allowed lateness >>>>>>> greater >>>>>>> than zero, there can still be multiple firings and so retractions are >>>>>>> needed. >>>>>>> >>>>>> Specifically, when I say "bi-temporal join" I mean >>>>>> unbounded-to-unbounded join where one of the join conditions is that >>>>>> elements are within event time distance d of one another. An element at >>>>>> time t will be saved until time t + 2d and then garbage collected. Every >>>>>> matching pair can be emitted immediately. >>>>>> >>>>>> OK, this might simplify things a little. Is there a design doc for >>>>>> that? If there are multiple LHS elements within event time distance from >>>>>> RHS element, which one should be joined? I suppose all of them, but that >>>>>> is >>>>>> not "(time-varying-)relational" join semantics. In that semantics only >>>>>> the >>>>>> last element must be joined, because that is how a (classical) relational >>>>>> database would see the relation at time T (the old record would have been >>>>>> overwritten and not be part of the output). Because of the time distance >>>>>> constraint this is different from the join I have in mind, because that >>>>>> simply joins every LHS element(s) to most recent RHS element(s) and vice >>>>>> versa, without any additional time constraints (that is the RHS "update" >>>>>> can happen arbitrarily far in past). >>>>>> >>>>>> Jan >>>>>> >>>>>> >>>>>> In the triggered CoGBK + join-product implementation, you do need >>>>>> retractions as a model concept. But you don't need full support, since >>>>>> they >>>>>> only need to be shipped as deltas and only from the CoGBK to the >>>>>> join-product transform where they are all consumed to create only >>>>>> positive >>>>>> elements. Again a delay is not required; this yields correct results with >>>>>> the "always" trigger. >>>>>> >>>>>> Neither case requires waiting or time sorting a whole buffer. The >>>>>> bi-temporal join requires something more, in a way, since you need to >>>>>> query >>>>>> by time range and GC time prefixes. >>>>>> >>>>>> Kenn >>>>>> >>>>>> Jan >>>>>>> On 11/25/19 10:17 PM, Rui Wang wrote: >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Mon, Nov 25, 2019 at 11:29 AM Jan Lukavský <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> >>>>>>>> On 11/25/19 7:47 PM, Kenneth Knowles wrote: >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Sun, Nov 24, 2019 at 12:57 AM Jan Lukavský <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> I can put down a design document, but before that I need to >>>>>>>>> clarify some things for me. I'm struggling to put all of this into a >>>>>>>>> bigger >>>>>>>>> picture. Sorry if the arguments are circulating, but I didn't notice >>>>>>>>> any >>>>>>>>> proposal of how to solve these. If anyone can disprove any of this >>>>>>>>> logic it >>>>>>>>> would be very much appreciated as I might be able to get from a dead >>>>>>>>> end: >>>>>>>>> >>>>>>>>> a) in the bi-temporal join you can either buffer until watermark, >>>>>>>>> or emit false data that has to be retracted >>>>>>>>> >>>>>>>> This is not the case. A stateful DoFn based join can emit >>>>>>>> immediately joined rows that will never need to be retracted. The need >>>>>>>> for >>>>>>>> retractions has to do with CoGBK-based implementation of a join. >>>>>>>> >>>>>>>> I fail to see how this could work. If I emit joined rows >>>>>>>> immediately without waiting for watermark to pass, I can join two >>>>>>>> elements, >>>>>>>> that don't belong to each other, because later can arrive element with >>>>>>>> lower time distance, that should have been joint in the place of the >>>>>>>> previously emitted one. This is wrong result that has to be retracted. >>>>>>>> Or >>>>>>>> what I'm missing? >>>>>>>> >>>>>>> >>>>>>> Hi Kenn, you think stateful DoFn based join can emit joined rows >>>>>>> that never to be retracted because in stateful DoFn case joined rows >>>>>>> will >>>>>>> be controlled by timers and emit will be only once? If so I will agree >>>>>>> with >>>>>>> it. Generally speaking, if only emit once is the factor of needing >>>>>>> retraction or not. >>>>>>> >>>>>>> In the past brainstorming, even having retractions ready, streaming >>>>>>> join with windowing are likely be implemented by a style of CoGBK + >>>>>>> stateful DoFn. >>>>>>> >>>>>>> >>>>>>> >>>>>>> I suggest that you work out the definition of the join you are >>>>>>>> interested in, with a good amount of mathematical rigor, and then >>>>>>>> consider >>>>>>>> the ways you can implement it. That is where a design doc will probably >>>>>>>> clarify things. >>>>>>>> >>>>>>>> Kenn >>>>>>>> >>>>>>>> b) until retractions are 100% functional (and that is sort of holy >>>>>>>>> grail for now), then the only solution is using a buffer holding data >>>>>>>>> up to >>>>>>>>> watermark *and then sort by event time* >>>>>>>>> >>>>>>>> c) even if retractions were 100% functional, there would have to >>>>>>>>> be special implementation for batch case, because otherwise this would >>>>>>>>> simply blow up downstream processing with insanely many false >>>>>>>>> additions and >>>>>>>>> subsequent retractions >>>>>>>>> >>>>>>>>> Property b) means that if we want this feature now, we must sort >>>>>>>>> by event time and there is no way around. Property c) shows that even >>>>>>>>> in >>>>>>>>> the future, we must make (in certain cases) distinction between batch >>>>>>>>> and >>>>>>>>> streaming code paths, which seems weird to me, but it might be an >>>>>>>>> option. >>>>>>>>> But still, there is no way to express this join in batch case, >>>>>>>>> because it >>>>>>>>> would require either buffering (up to) whole input on local worker >>>>>>>>> (doesn't >>>>>>>>> look like viable option) or provide a way in user code to signal the >>>>>>>>> need >>>>>>>>> for ordering of data inside GBK (and we are there again :)). Yes, we >>>>>>>>> might >>>>>>>>> shift this need from stateful dofn to GBK like >>>>>>>>> >>>>>>>>> input.apply(GroupByKey.sorted()) >>>>>>>>> >>>>>>>>> I cannot find a good reasoning why this would be better than >>>>>>>>> giving this semantics to (stateful) ParDo. >>>>>>>>> >>>>>>>>> Maybe someone can help me out here? >>>>>>>>> >>>>>>>>> Jan >>>>>>>>> On 11/24/19 5:05 AM, Kenneth Knowles wrote: >>>>>>>>> >>>>>>>>> I don't actually see how event time sorting simplifies this case >>>>>>>>> much. You still need to buffer elements until they can no longer be >>>>>>>>> matched >>>>>>>>> in the join, and you still need to query that buffer for elements that >>>>>>>>> might match. The general "bi-temporal join" (without sorting) >>>>>>>>> requires one >>>>>>>>> new state type and then it has identical API, does not require any >>>>>>>>> novel >>>>>>>>> data structures or reasoning, yields better latency (no sort buffer >>>>>>>>> delay), >>>>>>>>> and discards less data (no sort buffer cutoff; watermark is better). >>>>>>>>> Perhaps a design document about this specific case would clarify. >>>>>>>>> >>>>>>>>> Kenn >>>>>>>>> >>>>>>>>> On Fri, Nov 22, 2019 at 10:08 PM Jan Lukavský <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> I didn't want to go too much into detail, but to describe the >>>>>>>>>> idea roughly (ignoring the problem of different window fns on both >>>>>>>>>> sides to >>>>>>>>>> keep it as simple as possible): >>>>>>>>>> >>>>>>>>>> rhs ----- \ >>>>>>>>>> >>>>>>>>>> flatten (on global window) ---- stateful par do >>>>>>>>>> (sorted by event time) ---- output >>>>>>>>>> >>>>>>>>>> lhs ----- / >>>>>>>>>> >>>>>>>>>> If we can guarantee event time order arrival of events into the >>>>>>>>>> stateful pardo, then the whole complexity reduces to keep current >>>>>>>>>> value of >>>>>>>>>> left and right element and just flush them out each time there is an >>>>>>>>>> update. That is the "knob" is actually when watermark moves, because >>>>>>>>>> it is >>>>>>>>>> what tells the join operation that there will be no more (not late) >>>>>>>>>> input. >>>>>>>>>> This is very, very simplified, but depicts the solution. The >>>>>>>>>> "classical" >>>>>>>>>> windowed join reduces to this if all data in each window is >>>>>>>>>> projected onto >>>>>>>>>> window end boundary. Then there will be a cartesian product, because >>>>>>>>>> all >>>>>>>>>> the elements have the same timestamp. I can put this into a design >>>>>>>>>> doc with >>>>>>>>>> all the details, I was trying to find out if there is or was any >>>>>>>>>> effort >>>>>>>>>> around this. >>>>>>>>>> >>>>>>>>>> I was in touch with Reza in the PR #9032, I think that it >>>>>>>>>> currently suffers from problems with running this on batch. >>>>>>>>>> >>>>>>>>>> I think I can even (partly) resolve the retraction issue (for >>>>>>>>>> joins), as described on the thread [1]. Shortly, there can be two >>>>>>>>>> copies of >>>>>>>>>> the stateful dofn, one running at watermark and the other at >>>>>>>>>> (watermark - >>>>>>>>>> allowed lateness). One would produce ON_TIME (maybe wrong) results, >>>>>>>>>> the >>>>>>>>>> other would produce LATE but correct ones. Being able to compare >>>>>>>>>> them, the >>>>>>>>>> outcome would be that it would be possible to retract the wrong >>>>>>>>>> results. >>>>>>>>>> >>>>>>>>>> Yes, this is also about providing more evidence of why I think >>>>>>>>>> event-time sorting should be (somehow) part of the model. :-) >>>>>>>>>> >>>>>>>>>> Jan >>>>>>>>>> >>>>>>>>>> [1] >>>>>>>>>> https://lists.apache.org/thread.html/a736ebd6b0913d2e06dfa54797716d022adf2c45140530d5c3bd538d@%3Cdev.beam.apache.org%3E >>>>>>>>>> On 11/23/19 5:54 AM, Kenneth Knowles wrote: >>>>>>>>>> >>>>>>>>>> +Mikhail Gryzykhin <[email protected]> +Rui Wang >>>>>>>>>> <[email protected]> +Reza Rokni <[email protected]> who have all >>>>>>>>>> done some investigations here. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Fri, Nov 22, 2019 at 11:48 AM Jan Lukavský <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On 11/22/19 7:54 PM, Reuven Lax wrote: >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Fri, Nov 22, 2019 at 10:19 AM Jan Lukavský <[email protected]> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Reuven, >>>>>>>>>>>> >>>>>>>>>>>> I didn't investigate that particular one, but looking into that >>>>>>>>>>>> now, it looks that is (same as the "classic" join library) builds >>>>>>>>>>>> around >>>>>>>>>>>> CoGBK. Is that correct? If yes, then it essentially means that it: >>>>>>>>>>>> >>>>>>>>>>> - works only for cases where both sides have the same windowfn >>>>>>>>>>>> (that is limitation of Flatten that precedes CoGBK) >>>>>>>>>>>> >>>>>>>>>>> Correct. Did you want to join different windows? If so what are >>>>>>>>>>> the semantics? If the lhs has FixedWindows and the rhs has >>>>>>>>>>> SessionWindows, >>>>>>>>>>> what do you want the join semantics to be? The only thing I could >>>>>>>>>>> imagine >>>>>>>>>>> would be for the user to provide some function telling the join how >>>>>>>>>>> to map >>>>>>>>>>> the windows together, but that could be pretty complicated. >>>>>>>>>>> >>>>>>>>>>> I don't want to go too far into details, but generally both lhs >>>>>>>>>>> and rhs can be put onto time line and then full join can be defined >>>>>>>>>>> as each >>>>>>>>>>> pair of (lhs, first preceding rhs) and (rhs, first preceding lhs). >>>>>>>>>>> Then the >>>>>>>>>>> end of window is semantically just clearing the joined value >>>>>>>>>>> (setting it to >>>>>>>>>>> null, thus at the end of window there will be pair (lhs, null) or >>>>>>>>>>> (null, >>>>>>>>>>> rhs) in case of full outer join). This way any combination of >>>>>>>>>>> windows is >>>>>>>>>>> possible, because all window does is that it "scopes" validity of >>>>>>>>>>> respective values (lhs, rhs). >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> I think it is very valid to hope to do a join in the sense of a >>>>>>>>>> relational join where it is row-to-row. In this case, Beam's concept >>>>>>>>>> of >>>>>>>>>> windowing may or may not make sense. It is just a tool for the job. >>>>>>>>>> It is >>>>>>>>>> just a grouping key that provides a time when state can be deleted. >>>>>>>>>> So I >>>>>>>>>> would say your use case is more global window to global window join. >>>>>>>>>> That >>>>>>>>>> is what I think of as a true stream-to-stream join anyhow. You >>>>>>>>>> probably >>>>>>>>>> don't want to wait forever for output. So you'll need to use some >>>>>>>>>> knob >>>>>>>>>> other than Beam windows or triggers. >>>>>>>>>> >>>>>>>>>>> Reza has prototyped a join like you describe here: >>>>>>>>>> https://github.com/apache/beam/pull/9032 >>>>>>>>>> >>>>>>>>>> If your join condition explicitly includes the event time >>>>>>>>>> distance between elements, then it could "just work". If that isn't >>>>>>>>>> really >>>>>>>>>> part of your join condition, then you will have to see this >>>>>>>>>> restriction as >>>>>>>>>> a "knob" that you tweak on your results. >>>>>>>>>> >>>>>>>>>>> - when using global window, there has to be trigger and (afaik) >>>>>>>>>>>> there is no trigger that would guarantee firing after each data >>>>>>>>>>>> element >>>>>>>>>>>> (for early panes) (because triggers are there to express >>>>>>>>>>>> cost-latency >>>>>>>>>>>> tradeoff, not semantics) >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Can you explain the use case where this matters? If you do >>>>>>>>>>> trigger elementCountAtLeast(1) on the join, then the consumer will >>>>>>>>>>> simply >>>>>>>>>>> see a continuous stream of outputs. I'm not sure I understand why >>>>>>>>>>> the >>>>>>>>>>> consumer cares that some of those outputs were in a pane that >>>>>>>>>>> really held 3 >>>>>>>>>>> outputs instead of 1. >>>>>>>>>>> >>>>>>>>>>> What I'm trying to solve is basically this: >>>>>>>>>>> >>>>>>>>>>> - lhs is event stream >>>>>>>>>>> >>>>>>>>>>> - rhs is stream of a "state updates" >>>>>>>>>>> >>>>>>>>>>> purpose of the join is "take each event, pair it with currently >>>>>>>>>>> valid state and produce output and possibly modified state". I >>>>>>>>>>> cannot >>>>>>>>>>> process two events at a time, because first event can modify the >>>>>>>>>>> state and >>>>>>>>>>> the subsequent event should see this. It is not a "simple" stateful >>>>>>>>>>> pardo >>>>>>>>>>> either, because the state can be modified externally (not going >>>>>>>>>>> into too >>>>>>>>>>> much detail here, but e.g. by writing into kafka topic). >>>>>>>>>>> >>>>>>>>>> Reuven's explanation is missing some detail. If the CoGBK is in >>>>>>>>>> discarding mode, then it will miss join results. If the CoGBK is in >>>>>>>>>> accumulating mode, it will duplicate join results. This is a known >>>>>>>>>> problem >>>>>>>>>> and the general solution is retractions. >>>>>>>>>> >>>>>>>>>> Basically, CoGBK-based joins just don't work with triggers until >>>>>>>>>> we have retractions. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> Moreover, I'd like to define the join semantics so that when >>>>>>>>>>>> there are available elements from both sides, the fired pane >>>>>>>>>>>> should be >>>>>>>>>>>> ON_TIME, not EARLY. That essentially means that the fully general >>>>>>>>>>>> case >>>>>>>>>>>> would not be built around (Co)GBK, but stateful ParDo. There are >>>>>>>>>>>> specific >>>>>>>>>>>> options where this fully general case "degrades" into forms that >>>>>>>>>>>> can be >>>>>>>>>>>> efficiently expressed using (Co)GBK, that is true. >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> BTW building this around stateful DoFn might be a better fit. >>>>>>>>>>> The main reason I didn't is because we would need a good distributed >>>>>>>>>>> MapState (something discussed fairly recently on the list), and >>>>>>>>>>> that is not >>>>>>>>>>> yet built. Once we had that, I might be inclined to rewrite this >>>>>>>>>>> join on >>>>>>>>>>> stateful DoFn. >>>>>>>>>>> >>>>>>>>>>> Yes, the sorted state helps for streaming case. But I'd be >>>>>>>>>>> careful about that for batch case, where this might lead to high >>>>>>>>>>> pressure >>>>>>>>>>> on the state (and InMemoryStateInternals might OOME for instance). >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> However can you explain what you are expecting from the pane? An >>>>>>>>>>> EARLY pane simply means that we are producing output before the end >>>>>>>>>>> of the >>>>>>>>>>> window. If you are in the global window triggering every element, >>>>>>>>>>> then >>>>>>>>>>> every output is EARLY. It might seem weird if you are interpreting >>>>>>>>>>> EARLY as >>>>>>>>>>> "outputting data that isn't ready," however that's not what EARLY is >>>>>>>>>>> defined to be. Any change to the pane semantics would be a major >>>>>>>>>>> breaking >>>>>>>>>>> change to very fundamental semantics. >>>>>>>>>>> >>>>>>>>>>> I wonder if you are really objecting to the name EARLY and >>>>>>>>>>> ON_TIME? Maybe we would've been better off tagging it >>>>>>>>>>> BEFORE_WINDOW_END >>>>>>>>>>> instead of EARLY, to make it clear what is meant? >>>>>>>>>>> >>>>>>>>>>> Essentially I don't object anything here. I'm missing solution >>>>>>>>>>> to the "event vs. state" join described above. I was thinking about >>>>>>>>>>> how to >>>>>>>>>>> make these types of problems more user friendly and it essentially >>>>>>>>>>> leads to >>>>>>>>>>> creating a somewhat more generic semantics of join, where >>>>>>>>>>> end-of-window is >>>>>>>>>>> converted into "'value-delete events" and then just joining by the >>>>>>>>>>> "previous" or "valid" value (yes, this relates to validity windows >>>>>>>>>>> mentioned on Beam Summit Europe). It actually turns out that with >>>>>>>>>>> some work >>>>>>>>>>> we could define quite "naturally" a join on two streams with global >>>>>>>>>>> window >>>>>>>>>>> and no trigger. It would even function with lowest latency possible >>>>>>>>>>> (but >>>>>>>>>>> yes, with the highest expenses, it is actually the introduction of >>>>>>>>>>> (same!) >>>>>>>>>>> windows that enable certain optimizations). It the correctly defines >>>>>>>>>>> semantics for different windows, although the result would be >>>>>>>>>>> (probably >>>>>>>>>>> unexpectedly) windowed using global window. But that doesn't seem >>>>>>>>>>> to be any >>>>>>>>>>> breaking change, because it is currently not possible (any such >>>>>>>>>>> pipeline >>>>>>>>>>> will not be validated). >>>>>>>>>>> >>>>>>>>>>> Maybe for reference, the unwindowed join would be what is >>>>>>>>>>> described here [1] >>>>>>>>>>> >>>>>>>>>>> [1] >>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-KStream-KTableJoin >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> Jan >>>>>>>>>>>> On 11/22/19 6:47 PM, Reuven Lax wrote: >>>>>>>>>>>> >>>>>>>>>>>> Have you seen the Join library that is part of schemas? I'm >>>>>>>>>>>> curious whether this fits your needs, or there's something lacking >>>>>>>>>>>> there. >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Nov 22, 2019 at 12:31 AM Jan Lukavský <[email protected]> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi, >>>>>>>>>>>>> >>>>>>>>>>>>> based on roadmap [1], we would like to define and implement a >>>>>>>>>>>>> full set >>>>>>>>>>>>> of (unified) stream-stream joins. That would include: >>>>>>>>>>>>> >>>>>>>>>>>>> - joins (left, right, full outer) on global window with >>>>>>>>>>>>> "immediate >>>>>>>>>>>>> trigger" >>>>>>>>>>>>> >>>>>>>>>>>>> - joins with different windowing functions on left and right >>>>>>>>>>>>> side >>>>>>>>>>>>> >>>>>>>>>>>>> The approach would be to define these operations in a natural >>>>>>>>>>>>> way, so >>>>>>>>>>>>> that the definition is aligned with how current joins work >>>>>>>>>>>>> (same >>>>>>>>>>>>> windows, cartesian product of values with same keys, output >>>>>>>>>>>>> timestamp >>>>>>>>>>>>> projected to the end of window, etc.). Because this should be >>>>>>>>>>>>> a generic >>>>>>>>>>>>> approach, this effort should probably be part of join library, >>>>>>>>>>>>> that can >>>>>>>>>>>>> the be reused by other components, too (e.g. SQL). >>>>>>>>>>>>> >>>>>>>>>>>>> The question is - is (or was) there any effort that we can >>>>>>>>>>>>> build upon? >>>>>>>>>>>>> Or should this be designed from scratch? >>>>>>>>>>>>> >>>>>>>>>>>>> Jan >>>>>>>>>>>>> >>>>>>>>>>>>> [1] https://beam.apache.org/roadmap/euphoria/ >>>>>>>>>>>>> >>>>>>>>>>>>> >> >> -- >> >> This email may be confidential and privileged. If you received this >> communication by mistake, please don't forward it to anyone else, please >> erase all copies and attachments, and please let me know that it has gone >> to the wrong person. >> >> The above terms reflect a potential business arrangement, are provided >> solely as a basis for further discussion, and are not intended to be and do >> not constitute a legally binding obligation. No legally binding obligations >> will be created, implied, or inferred until an agreement in final form is >> executed in writing by all parties involved. >> >>
