> Aaron - do you have the information you need to implement your sink? My
impression is that you have quite a good grasp of the issues even before
you asked.

Yes I do thank you. I really appreciate the thorough help from everyone
Thank you

On Wed, Dec 4, 2019 at 9:41 AM Jan Lukavský <[email protected]> wrote:

> Hi Kenn,
> On 12/4/19 5:38 AM, Kenneth Knowles wrote:
>
> Jan - let's try to defrag the threads on your time sorting proposal. This
> thread may have useful ideas but I want to focus on helping Aaron in this
> thread. You can link to this thread from other threads or from a design
> doc. Does this seem OK to you?
>
> sure. :-)
>
> I actually think the best thread to continue the discussion would be [1].
> The reason why this discussion probably got fragmented is that the other
> threads seem to die out without any conclusion. :-(
>
> Jan
>
> [1]
> https://lists.apache.org/thread.html/e2f729c7cea22553fc34421d4547132fa1c2ec01035eb4fb1a426873%40%3Cdev.beam.apache.org%3E
>
>
> Aaron - do you have the information you need to implement your sink? My
> impression is that you have quite a good grasp of the issues even before
> you asked.
>
> Kenn
>
> On Wed, Nov 27, 2019 at 3:05 AM Jan Lukavský <[email protected]> wrote:
>
>> > Trigger firings can have decreasing event timestamps w/ the minimum
>> timestamp combiner*. I do think the issue at hand is best analyzed in terms
>> of the explicit ordering on panes. And I do think we need to have an
>> explicit guarantee or annotation strong enough to describe a
>> correct-under-all-allowed runners sink. Today an antagonistic runner could
>> probably break a lot of things.
>>
>> Thanks for this insight. I didn't know about the relation between trigger
>> firing (event) time - which is always non-decreasing - and the resulting
>> timestamp of output pane - which can be affected by timestamp combiner and
>> decrease in cases you describe. What actually correlates with the pane
>> index at all times is processing time of trigger firings with the pane
>> index. Would you say, that if the "annotation that would guarantee ordering
>> of panes" could be viewed as a time ordering annotation with an additional
>> time domain (event time, processing time)? Could then these two be viewed
>> as a single one with some distinguishing parameter?
>>
>> @RequiresTimeSortedInput(Domain.PANE_INDEX | Domain.EVENT_TIME)
>>
>> ?
>>
>> Event time should be probably made the default, because that is
>> information that is accessible with every WindowedValue, while pane index
>> is available only after GBK (or generally might be available after every
>> keyed sequential operation, but is missing after source for instance).
>>
>> Jan
>> On 11/27/19 1:32 AM, Kenneth Knowles wrote:
>>
>>
>>
>> On Tue, Nov 26, 2019 at 1:00 AM Jan Lukavský <[email protected]> wrote:
>>
>>> > I will not try to formalize this notion in this email. But I will note
>>> that since it is universally assured, it would be zero cost and
>>> significantly safer to formalize it and add an annotation noting it was
>>> required. It has nothing to do with event time ordering, only trigger
>>> firing ordering.
>>>
>>> I cannot agree with the last sentence (and I'm really not doing this on
>>> purpose :-)). Panes generally arrive out of order, as mentioned several
>>> times in the discussions linked from this thread. If we want to ensure
>>> "trigger firing ordering", we can use the pane index, that is correct. But
>>> - that is actually equivalent to sorting by event time, because pane index
>>> order will be (nearly) the same as event time order. This is due to the
>>> fact, that pane index and event time correlate (both are monotonic).
>>>
>> Trigger firings can have decreasing event timestamps w/ the minimum
>> timestamp combiner*. I do think the issue at hand is best analyzed in terms
>> of the explicit ordering on panes. And I do think we need to have an
>> explicit guarantee or annotation strong enough to describe a
>> correct-under-all-allowed runners sink. Today an antagonistic runner could
>> probably break a lot of things.
>>
>> Kenn
>>
>> *In fact, they can decrease via the "maximum" timestamp combiner because
>> actually timestamp combiners only apply to the elements that particular
>> pane. This is weird, and maybe a design bug, but good to know about.
>>
>>
>>> The pane index "only" solves the issue of preserving ordering even in
>>> case where there are multiple firings within the same timestamp (regardless
>>> of granularity). This was mentioned in the initial discussion about event
>>> time ordering, and is part of the design doc - users should be allowed to
>>> provide UDF for extracting time-correlated ordering field (which means
>>> ability to choose a preferred, or authoritative, observer which assigns
>>> unambiguous ordering to events). Example of this might include Kafka
>>> offsets as well, or any queue index for that matter. This is not yet
>>> implemented, but could (should) be in the future.
>>>
>>> The only case where these two things are (somewhat) different is the
>>> case mentioned by @Steve - if the output is stateless ParDo, which will get
>>> fused. But that is only because the processing is single-threaded per key,
>>> and therefore the ordering is implied by timer ordering (and careful here,
>>> many runners don't have this ordering 100% correct, as of now - this
>>> problem luckily appears only when there are multiple timers per key).
>>> Moreover, if there should be a failure, then the output might (would) get
>>> back in time anyway. If there would be a shuffle operation after
>>> GBK/Combine, then the ordering is no longer guaranteed and must be
>>> explicitly taken care of.
>>>
>>> Last note, I must agree with @Rui that all these discussions are very
>>> much related to retractions (precisely the ability to implement them).
>>>
>>> Jan
>>> On 11/26/19 7:34 AM, Kenneth Knowles wrote:
>>>
>>> Hi Aaron,
>>>
>>> Another insightful observation.
>>>
>>> Whenever an aggregation (GBK / Combine per key) has a trigger firing,
>>> there is a per-key sequence number attached. It is included in metadata
>>> known as "PaneInfo" [1]. The value of PaneInfo.getIndex() is colloquially
>>> referred to as the "pane index". You can also make use of the "on time
>>> index" if you like. The best way to access this metadata is to add a
>>> parameter of type PaneInfo to your DoFn's @ProcessElement method. This
>>> works for stateful or stateless DoFn.
>>>
>>> Most of Beam's IO connectors do not explicitly enforce that outputs
>>> occur in pane index order but instead rely on the hope that the runner
>>> delivers panes in order to the sink. IMO this is dangerous but it has not
>>> yet caused a known issue. In practice, each "input key to output key 'path'
>>> " through a pipeline's logic does preserve order for all existing runners
>>> AFAIK and it is the formalization that is missing. It is related to an
>>> observation by +Rui Wang <[email protected]> that processing
>>> retractions requires the same key-to-key ordering.
>>>
>>> I will not try to formalize this notion in this email. But I will note
>>> that since it is universally assured, it would be zero cost and
>>> significantly safer to formalize it and add an annotation noting it was
>>> required. It has nothing to do with event time ordering, only trigger
>>> firing ordering.
>>>
>>> Kenn
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
>>> [2]
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L557
>>>
>>>
>>> On Mon, Nov 25, 2019 at 4:06 PM Pablo Estrada <[email protected]>
>>> wrote:
>>>
>>>> The blog posts on stateful and timely computation with Beam should help
>>>> clarify a lot about how to use state and timers to do this:
>>>> https://beam.apache.org/blog/2017/02/13/stateful-processing.html
>>>> https://beam.apache.org/blog/2017/08/28/timely-processing.html
>>>>
>>>> You'll see there how there's an implicit per-single-element grouping
>>>> for each key, so state and timers should support your use case very well.
>>>>
>>>> Best
>>>> -P.
>>>>
>>>> On Mon, Nov 25, 2019 at 3:47 PM Steve Niemitz <[email protected]>
>>>> wrote:
>>>>
>>>>> If you have a pipeline that looks like Input -> GroupByKey -> ParDo,
>>>>> while it is not guaranteed, in practice the sink will observe the trigger
>>>>> firings in order (per key), since it'll be fused to the output of the GBK
>>>>> operation (in all runners I know of).
>>>>>
>>>>> There have been a couple threads about trigger ordering as well on the
>>>>> list recently that might have more information:
>>>>>
>>>>> https://lists.apache.org/thread.html/b61a908289a692dbd80dd6a869759eacd45b308cb3873bfb77c4def6@%3Cdev.beam.apache.org%3E
>>>>>
>>>>> https://lists.apache.org/thread.html/20d11046d26174969ef44a781e409a1cb9f7c736e605fa40fdf98397@%3Cuser.beam.apache.org%3E
>>>>>
>>>>>
>>>>> On Mon, Nov 25, 2019 at 5:52 PM Aaron Dixon <[email protected]> wrote:
>>>>>
>>>>>> @Jan @Pablo Thank you
>>>>>>
>>>>>> @Pablo In this case it's a single global windowed Combine/perKey,
>>>>>> triggered per element. Keys are few (client accounts) so they can live
>>>>>> forever.
>>>>>>
>>>>>> It looks like just by virtue of using a stateful ParDo I could get
>>>>>> this final execution to be "serialized" per key. (Then I could simply do
>>>>>> the compare-and-swap using Beam's state mechanism to keep track of the
>>>>>> "latest trigger timestamp" instead of having to orchestrate
>>>>>> compare-and-swap in the target store :thinking:.)
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Nov 25, 2019 at 4:14 PM Jan Lukavský <[email protected]> wrote:
>>>>>>
>>>>>>> One addition, to make the list of options exhaustive, there is
>>>>>>> probably
>>>>>>> one more option
>>>>>>>
>>>>>>>   c) create a ParDo keyed by primary key of your sink, cache the
>>>>>>> last
>>>>>>> write in there and compare it locally, without the need to query the
>>>>>>> database
>>>>>>>
>>>>>>> It would still need some timer to clear values after watermark +
>>>>>>> allowed
>>>>>>> lateness, because otherwise you would have to cache your whole
>>>>>>> database
>>>>>>> on workers. But because you don't need actual ordering, you just
>>>>>>> need
>>>>>>> the most recent value (if I got it right) this might be an option.
>>>>>>>
>>>>>>> Jan
>>>>>>>
>>>>>>> On 11/25/19 10:53 PM, Jan Lukavský wrote:
>>>>>>> > Hi Aaron,
>>>>>>> >
>>>>>>> > maybe someone else will give another option, but if I understand
>>>>>>> > correctly what you want to solve, then you essentially have to do
>>>>>>> either:
>>>>>>> >
>>>>>>> >  a) use the compare & swap mechanism in the sink you described
>>>>>>> >
>>>>>>> >  b) use a buffer to buffer elements inside the outputting ParDo
>>>>>>> and
>>>>>>> > only output them when watermark passes (using a timer).
>>>>>>> >
>>>>>>> > There is actually an ongoing discussion about how to make option
>>>>>>> b)
>>>>>>> > user-friendly and part of Beam itself, but currently there is no
>>>>>>> > out-of-the-box solution for that.
>>>>>>> >
>>>>>>> > Jan
>>>>>>> >
>>>>>>> > On 11/25/19 10:27 PM, Aaron Dixon wrote:
>>>>>>> >> Suppose I trigger a Combine per-element (in a high-volume stream)
>>>>>>> and
>>>>>>> >> use a ParDo as a sink.
>>>>>>> >>
>>>>>>> >> I assume there is no guarantee about the order that my ParDo will
>>>>>>> see
>>>>>>> >> these triggers, especially as it processes in parallel, anyway.
>>>>>>> >>
>>>>>>> >> That said, my sink writes to a db or cache and I would not like
>>>>>>> the
>>>>>>> >> cache to ever regress its value to something "before" what it has
>>>>>>> >> already written.
>>>>>>> >>
>>>>>>> >> Is the best way to solve this problem to always write the
>>>>>>> event-time
>>>>>>> >> in the cache and do a compare-and-swap only updating the sink if
>>>>>>> the
>>>>>>> >> triggered value in-hand is later than the target value?
>>>>>>> >>
>>>>>>> >> Or is there a better way to guarantee that my ParDo sink will
>>>>>>> process
>>>>>>> >> elements in-order? (Eg, if I can give up per-event/real-time,
>>>>>>> then a
>>>>>>> >> delay-based trigger would probably be sufficient I imagine.)
>>>>>>> >>
>>>>>>> >> Thanks for advice!
>>>>>>>
>>>>>>

Reply via email to