The blog posts on stateful and timely computation with Beam should help
clarify a lot about how to use state and timers to do this:
https://beam.apache.org/blog/2017/02/13/stateful-processing.html
https://beam.apache.org/blog/2017/08/28/timely-processing.html

You'll see there how there's an implicit per-single-element grouping for
each key, so state and timers should support your use case very well.

Best
-P.

On Mon, Nov 25, 2019 at 3:47 PM Steve Niemitz <sniem...@apache.org> wrote:

> If you have a pipeline that looks like Input -> GroupByKey -> ParDo, while
> it is not guaranteed, in practice the sink will observe the trigger firings
> in order (per key), since it'll be fused to the output of the GBK operation
> (in all runners I know of).
>
> There have been a couple threads about trigger ordering as well on the
> list recently that might have more information:
>
> https://lists.apache.org/thread.html/b61a908289a692dbd80dd6a869759eacd45b308cb3873bfb77c4def6@%3Cdev.beam.apache.org%3E
>
> https://lists.apache.org/thread.html/20d11046d26174969ef44a781e409a1cb9f7c736e605fa40fdf98397@%3Cuser.beam.apache.org%3E
>
>
> On Mon, Nov 25, 2019 at 5:52 PM Aaron Dixon <atdi...@gmail.com> wrote:
>
>> @Jan @Pablo Thank you
>>
>> @Pablo In this case it's a single global windowed Combine/perKey,
>> triggered per element. Keys are few (client accounts) so they can live
>> forever.
>>
>> It looks like just by virtue of using a stateful ParDo I could get this
>> final execution to be "serialized" per key. (Then I could simply do the
>> compare-and-swap using Beam's state mechanism to keep track of the "latest
>> trigger timestamp" instead of having to orchestrate compare-and-swap in the
>> target store :thinking:.)
>>
>>
>>
>> On Mon, Nov 25, 2019 at 4:14 PM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> One addition, to make the list of options exhaustive, there is probably
>>> one more option
>>>
>>>   c) create a ParDo keyed by primary key of your sink, cache the last
>>> write in there and compare it locally, without the need to query the
>>> database
>>>
>>> It would still need some timer to clear values after watermark + allowed
>>> lateness, because otherwise you would have to cache your whole database
>>> on workers. But because you don't need actual ordering, you just need
>>> the most recent value (if I got it right) this might be an option.
>>>
>>> Jan
>>>
>>> On 11/25/19 10:53 PM, Jan Lukavský wrote:
>>> > Hi Aaron,
>>> >
>>> > maybe someone else will give another option, but if I understand
>>> > correctly what you want to solve, then you essentially have to do
>>> either:
>>> >
>>> >  a) use the compare & swap mechanism in the sink you described
>>> >
>>> >  b) use a buffer to buffer elements inside the outputting ParDo and
>>> > only output them when watermark passes (using a timer).
>>> >
>>> > There is actually an ongoing discussion about how to make option b)
>>> > user-friendly and part of Beam itself, but currently there is no
>>> > out-of-the-box solution for that.
>>> >
>>> > Jan
>>> >
>>> > On 11/25/19 10:27 PM, Aaron Dixon wrote:
>>> >> Suppose I trigger a Combine per-element (in a high-volume stream) and
>>> >> use a ParDo as a sink.
>>> >>
>>> >> I assume there is no guarantee about the order that my ParDo will see
>>> >> these triggers, especially as it processes in parallel, anyway.
>>> >>
>>> >> That said, my sink writes to a db or cache and I would not like the
>>> >> cache to ever regress its value to something "before" what it has
>>> >> already written.
>>> >>
>>> >> Is the best way to solve this problem to always write the event-time
>>> >> in the cache and do a compare-and-swap only updating the sink if the
>>> >> triggered value in-hand is later than the target value?
>>> >>
>>> >> Or is there a better way to guarantee that my ParDo sink will process
>>> >> elements in-order? (Eg, if I can give up per-event/real-time, then a
>>> >> delay-based trigger would probably be sufficient I imagine.)
>>> >>
>>> >> Thanks for advice!
>>>
>>

Reply via email to