> Aaron - do you have the information you need to implement your sink? My impression is that you have quite a good grasp of the issues even before you asked.
Yes I do thank you. I really appreciate the thorough help from everyone Thank you On Wed, Dec 4, 2019 at 9:41 AM Jan Lukavský <[email protected]> wrote: > Hi Kenn, > On 12/4/19 5:38 AM, Kenneth Knowles wrote: > > Jan - let's try to defrag the threads on your time sorting proposal. This > thread may have useful ideas but I want to focus on helping Aaron in this > thread. You can link to this thread from other threads or from a design > doc. Does this seem OK to you? > > sure. :-) > > I actually think the best thread to continue the discussion would be [1]. > The reason why this discussion probably got fragmented is that the other > threads seem to die out without any conclusion. :-( > > Jan > > [1] > https://lists.apache.org/thread.html/e2f729c7cea22553fc34421d4547132fa1c2ec01035eb4fb1a426873%40%3Cdev.beam.apache.org%3E > > > Aaron - do you have the information you need to implement your sink? My > impression is that you have quite a good grasp of the issues even before > you asked. > > Kenn > > On Wed, Nov 27, 2019 at 3:05 AM Jan Lukavský <[email protected]> wrote: > >> > Trigger firings can have decreasing event timestamps w/ the minimum >> timestamp combiner*. I do think the issue at hand is best analyzed in terms >> of the explicit ordering on panes. And I do think we need to have an >> explicit guarantee or annotation strong enough to describe a >> correct-under-all-allowed runners sink. Today an antagonistic runner could >> probably break a lot of things. >> >> Thanks for this insight. I didn't know about the relation between trigger >> firing (event) time - which is always non-decreasing - and the resulting >> timestamp of output pane - which can be affected by timestamp combiner and >> decrease in cases you describe. What actually correlates with the pane >> index at all times is processing time of trigger firings with the pane >> index. Would you say, that if the "annotation that would guarantee ordering >> of panes" could be viewed as a time ordering annotation with an additional >> time domain (event time, processing time)? Could then these two be viewed >> as a single one with some distinguishing parameter? >> >> @RequiresTimeSortedInput(Domain.PANE_INDEX | Domain.EVENT_TIME) >> >> ? >> >> Event time should be probably made the default, because that is >> information that is accessible with every WindowedValue, while pane index >> is available only after GBK (or generally might be available after every >> keyed sequential operation, but is missing after source for instance). >> >> Jan >> On 11/27/19 1:32 AM, Kenneth Knowles wrote: >> >> >> >> On Tue, Nov 26, 2019 at 1:00 AM Jan Lukavský <[email protected]> wrote: >> >>> > I will not try to formalize this notion in this email. But I will note >>> that since it is universally assured, it would be zero cost and >>> significantly safer to formalize it and add an annotation noting it was >>> required. It has nothing to do with event time ordering, only trigger >>> firing ordering. >>> >>> I cannot agree with the last sentence (and I'm really not doing this on >>> purpose :-)). Panes generally arrive out of order, as mentioned several >>> times in the discussions linked from this thread. If we want to ensure >>> "trigger firing ordering", we can use the pane index, that is correct. But >>> - that is actually equivalent to sorting by event time, because pane index >>> order will be (nearly) the same as event time order. This is due to the >>> fact, that pane index and event time correlate (both are monotonic). >>> >> Trigger firings can have decreasing event timestamps w/ the minimum >> timestamp combiner*. I do think the issue at hand is best analyzed in terms >> of the explicit ordering on panes. And I do think we need to have an >> explicit guarantee or annotation strong enough to describe a >> correct-under-all-allowed runners sink. Today an antagonistic runner could >> probably break a lot of things. >> >> Kenn >> >> *In fact, they can decrease via the "maximum" timestamp combiner because >> actually timestamp combiners only apply to the elements that particular >> pane. This is weird, and maybe a design bug, but good to know about. >> >> >>> The pane index "only" solves the issue of preserving ordering even in >>> case where there are multiple firings within the same timestamp (regardless >>> of granularity). This was mentioned in the initial discussion about event >>> time ordering, and is part of the design doc - users should be allowed to >>> provide UDF for extracting time-correlated ordering field (which means >>> ability to choose a preferred, or authoritative, observer which assigns >>> unambiguous ordering to events). Example of this might include Kafka >>> offsets as well, or any queue index for that matter. This is not yet >>> implemented, but could (should) be in the future. >>> >>> The only case where these two things are (somewhat) different is the >>> case mentioned by @Steve - if the output is stateless ParDo, which will get >>> fused. But that is only because the processing is single-threaded per key, >>> and therefore the ordering is implied by timer ordering (and careful here, >>> many runners don't have this ordering 100% correct, as of now - this >>> problem luckily appears only when there are multiple timers per key). >>> Moreover, if there should be a failure, then the output might (would) get >>> back in time anyway. If there would be a shuffle operation after >>> GBK/Combine, then the ordering is no longer guaranteed and must be >>> explicitly taken care of. >>> >>> Last note, I must agree with @Rui that all these discussions are very >>> much related to retractions (precisely the ability to implement them). >>> >>> Jan >>> On 11/26/19 7:34 AM, Kenneth Knowles wrote: >>> >>> Hi Aaron, >>> >>> Another insightful observation. >>> >>> Whenever an aggregation (GBK / Combine per key) has a trigger firing, >>> there is a per-key sequence number attached. It is included in metadata >>> known as "PaneInfo" [1]. The value of PaneInfo.getIndex() is colloquially >>> referred to as the "pane index". You can also make use of the "on time >>> index" if you like. The best way to access this metadata is to add a >>> parameter of type PaneInfo to your DoFn's @ProcessElement method. This >>> works for stateful or stateless DoFn. >>> >>> Most of Beam's IO connectors do not explicitly enforce that outputs >>> occur in pane index order but instead rely on the hope that the runner >>> delivers panes in order to the sink. IMO this is dangerous but it has not >>> yet caused a known issue. In practice, each "input key to output key 'path' >>> " through a pipeline's logic does preserve order for all existing runners >>> AFAIK and it is the formalization that is missing. It is related to an >>> observation by +Rui Wang <[email protected]> that processing >>> retractions requires the same key-to-key ordering. >>> >>> I will not try to formalize this notion in this email. But I will note >>> that since it is universally assured, it would be zero cost and >>> significantly safer to formalize it and add an annotation noting it was >>> required. It has nothing to do with event time ordering, only trigger >>> firing ordering. >>> >>> Kenn >>> >>> [1] >>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java >>> [2] >>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L557 >>> >>> >>> On Mon, Nov 25, 2019 at 4:06 PM Pablo Estrada <[email protected]> >>> wrote: >>> >>>> The blog posts on stateful and timely computation with Beam should help >>>> clarify a lot about how to use state and timers to do this: >>>> https://beam.apache.org/blog/2017/02/13/stateful-processing.html >>>> https://beam.apache.org/blog/2017/08/28/timely-processing.html >>>> >>>> You'll see there how there's an implicit per-single-element grouping >>>> for each key, so state and timers should support your use case very well. >>>> >>>> Best >>>> -P. >>>> >>>> On Mon, Nov 25, 2019 at 3:47 PM Steve Niemitz <[email protected]> >>>> wrote: >>>> >>>>> If you have a pipeline that looks like Input -> GroupByKey -> ParDo, >>>>> while it is not guaranteed, in practice the sink will observe the trigger >>>>> firings in order (per key), since it'll be fused to the output of the GBK >>>>> operation (in all runners I know of). >>>>> >>>>> There have been a couple threads about trigger ordering as well on the >>>>> list recently that might have more information: >>>>> >>>>> https://lists.apache.org/thread.html/b61a908289a692dbd80dd6a869759eacd45b308cb3873bfb77c4def6@%3Cdev.beam.apache.org%3E >>>>> >>>>> https://lists.apache.org/thread.html/20d11046d26174969ef44a781e409a1cb9f7c736e605fa40fdf98397@%3Cuser.beam.apache.org%3E >>>>> >>>>> >>>>> On Mon, Nov 25, 2019 at 5:52 PM Aaron Dixon <[email protected]> wrote: >>>>> >>>>>> @Jan @Pablo Thank you >>>>>> >>>>>> @Pablo In this case it's a single global windowed Combine/perKey, >>>>>> triggered per element. Keys are few (client accounts) so they can live >>>>>> forever. >>>>>> >>>>>> It looks like just by virtue of using a stateful ParDo I could get >>>>>> this final execution to be "serialized" per key. (Then I could simply do >>>>>> the compare-and-swap using Beam's state mechanism to keep track of the >>>>>> "latest trigger timestamp" instead of having to orchestrate >>>>>> compare-and-swap in the target store :thinking:.) >>>>>> >>>>>> >>>>>> >>>>>> On Mon, Nov 25, 2019 at 4:14 PM Jan Lukavský <[email protected]> wrote: >>>>>> >>>>>>> One addition, to make the list of options exhaustive, there is >>>>>>> probably >>>>>>> one more option >>>>>>> >>>>>>> c) create a ParDo keyed by primary key of your sink, cache the >>>>>>> last >>>>>>> write in there and compare it locally, without the need to query the >>>>>>> database >>>>>>> >>>>>>> It would still need some timer to clear values after watermark + >>>>>>> allowed >>>>>>> lateness, because otherwise you would have to cache your whole >>>>>>> database >>>>>>> on workers. But because you don't need actual ordering, you just >>>>>>> need >>>>>>> the most recent value (if I got it right) this might be an option. >>>>>>> >>>>>>> Jan >>>>>>> >>>>>>> On 11/25/19 10:53 PM, Jan Lukavský wrote: >>>>>>> > Hi Aaron, >>>>>>> > >>>>>>> > maybe someone else will give another option, but if I understand >>>>>>> > correctly what you want to solve, then you essentially have to do >>>>>>> either: >>>>>>> > >>>>>>> > a) use the compare & swap mechanism in the sink you described >>>>>>> > >>>>>>> > b) use a buffer to buffer elements inside the outputting ParDo >>>>>>> and >>>>>>> > only output them when watermark passes (using a timer). >>>>>>> > >>>>>>> > There is actually an ongoing discussion about how to make option >>>>>>> b) >>>>>>> > user-friendly and part of Beam itself, but currently there is no >>>>>>> > out-of-the-box solution for that. >>>>>>> > >>>>>>> > Jan >>>>>>> > >>>>>>> > On 11/25/19 10:27 PM, Aaron Dixon wrote: >>>>>>> >> Suppose I trigger a Combine per-element (in a high-volume stream) >>>>>>> and >>>>>>> >> use a ParDo as a sink. >>>>>>> >> >>>>>>> >> I assume there is no guarantee about the order that my ParDo will >>>>>>> see >>>>>>> >> these triggers, especially as it processes in parallel, anyway. >>>>>>> >> >>>>>>> >> That said, my sink writes to a db or cache and I would not like >>>>>>> the >>>>>>> >> cache to ever regress its value to something "before" what it has >>>>>>> >> already written. >>>>>>> >> >>>>>>> >> Is the best way to solve this problem to always write the >>>>>>> event-time >>>>>>> >> in the cache and do a compare-and-swap only updating the sink if >>>>>>> the >>>>>>> >> triggered value in-hand is later than the target value? >>>>>>> >> >>>>>>> >> Or is there a better way to guarantee that my ParDo sink will >>>>>>> process >>>>>>> >> elements in-order? (Eg, if I can give up per-event/real-time, >>>>>>> then a >>>>>>> >> delay-based trigger would probably be sufficient I imagine.) >>>>>>> >> >>>>>>> >> Thanks for advice! >>>>>>> >>>>>>
