The blog posts on stateful and timely computation with Beam should help clarify a lot about how to use state and timers to do this: https://beam.apache.org/blog/2017/02/13/stateful-processing.html https://beam.apache.org/blog/2017/08/28/timely-processing.html
You'll see there how there's an implicit per-single-element grouping for each key, so state and timers should support your use case very well. Best -P. On Mon, Nov 25, 2019 at 3:47 PM Steve Niemitz <sniem...@apache.org> wrote: > If you have a pipeline that looks like Input -> GroupByKey -> ParDo, while > it is not guaranteed, in practice the sink will observe the trigger firings > in order (per key), since it'll be fused to the output of the GBK operation > (in all runners I know of). > > There have been a couple threads about trigger ordering as well on the > list recently that might have more information: > > https://lists.apache.org/thread.html/b61a908289a692dbd80dd6a869759eacd45b308cb3873bfb77c4def6@%3Cdev.beam.apache.org%3E > > https://lists.apache.org/thread.html/20d11046d26174969ef44a781e409a1cb9f7c736e605fa40fdf98397@%3Cuser.beam.apache.org%3E > > > On Mon, Nov 25, 2019 at 5:52 PM Aaron Dixon <atdi...@gmail.com> wrote: > >> @Jan @Pablo Thank you >> >> @Pablo In this case it's a single global windowed Combine/perKey, >> triggered per element. Keys are few (client accounts) so they can live >> forever. >> >> It looks like just by virtue of using a stateful ParDo I could get this >> final execution to be "serialized" per key. (Then I could simply do the >> compare-and-swap using Beam's state mechanism to keep track of the "latest >> trigger timestamp" instead of having to orchestrate compare-and-swap in the >> target store :thinking:.) >> >> >> >> On Mon, Nov 25, 2019 at 4:14 PM Jan Lukavský <je...@seznam.cz> wrote: >> >>> One addition, to make the list of options exhaustive, there is probably >>> one more option >>> >>> c) create a ParDo keyed by primary key of your sink, cache the last >>> write in there and compare it locally, without the need to query the >>> database >>> >>> It would still need some timer to clear values after watermark + allowed >>> lateness, because otherwise you would have to cache your whole database >>> on workers. But because you don't need actual ordering, you just need >>> the most recent value (if I got it right) this might be an option. >>> >>> Jan >>> >>> On 11/25/19 10:53 PM, Jan Lukavský wrote: >>> > Hi Aaron, >>> > >>> > maybe someone else will give another option, but if I understand >>> > correctly what you want to solve, then you essentially have to do >>> either: >>> > >>> > a) use the compare & swap mechanism in the sink you described >>> > >>> > b) use a buffer to buffer elements inside the outputting ParDo and >>> > only output them when watermark passes (using a timer). >>> > >>> > There is actually an ongoing discussion about how to make option b) >>> > user-friendly and part of Beam itself, but currently there is no >>> > out-of-the-box solution for that. >>> > >>> > Jan >>> > >>> > On 11/25/19 10:27 PM, Aaron Dixon wrote: >>> >> Suppose I trigger a Combine per-element (in a high-volume stream) and >>> >> use a ParDo as a sink. >>> >> >>> >> I assume there is no guarantee about the order that my ParDo will see >>> >> these triggers, especially as it processes in parallel, anyway. >>> >> >>> >> That said, my sink writes to a db or cache and I would not like the >>> >> cache to ever regress its value to something "before" what it has >>> >> already written. >>> >> >>> >> Is the best way to solve this problem to always write the event-time >>> >> in the cache and do a compare-and-swap only updating the sink if the >>> >> triggered value in-hand is later than the target value? >>> >> >>> >> Or is there a better way to guarantee that my ParDo sink will process >>> >> elements in-order? (Eg, if I can give up per-event/real-time, then a >>> >> delay-based trigger would probably be sufficient I imagine.) >>> >> >>> >> Thanks for advice! >>> >>