The consistency problem occurs even in a single output PCollection that is read as a side input, because two output elements can be re-bundled and materialized in separate updates to the side input.
Kenn On Thu, Apr 11, 2019 at 10:36 AM Ruoyun Huang <[email protected]> wrote: > With little to none experience on Trigger, I am trying to understand the > problem statement in this discussion. > > If a user is aware of the potential non-deterministic behavior, isn't it > almost trivial to refactor his/her user code, by putting PCollectionViews S > and T into one single PCollectionView S', to get around the issue? I > cannot think of a reason (wrong?) why a user *have* to put data into two > separate PCollectionViews in a single ParDo(A). > > On Thu, Apr 11, 2019 at 10:16 AM Lukasz Cwik <[email protected]> wrote: > >> Even though what Kenn points out is a major reason for me bringing up >> this topic, I didn't want to limit this discussion to how side inputs could >> work but in general what users want from their side inputs when dealing >> with multiple firings. >> >> On Thu, Apr 11, 2019 at 10:09 AM Kenneth Knowles <[email protected]> wrote: >> >>> Luke & I talked in person a bit. I want to give context for what is at >>> stake here, in terms of side inputs in portability. A decent starting place >>> is https://s.apache.org/beam-side-inputs-1-pager >>> >>> In that general design, the runner offers the SDK just one (or a few) >>> materialization strategies, and the SDK builds idiomatic structures on top >>> of it. Concretely, the Fn API today offers a multimap structure, and the >>> idea was that the SDK could cleverly prepare a PCollection<KV<...>> for the >>> runner to materialize. As a naive example, a simple iterable structure >>> could just map all elements to one dummy key in the multimap. But if you >>> wanted a list plus its length, then you might map all elements to an >>> element key and the length to a special length meta-key. >>> >>> So there is a problem: if the SDK is outputting a new KV<"elements-key", >>> ...> and KV<"length-key", ...> for the runner to materialize then consumers >>> of the side input need to see both updates to the materialization or >>> neither. In general, these outputs might span many keys. >>> >>> It seems like there are a few ways to resolve this tension: >>> >>> - Establish a consistency model so these updates will be observed >>> together. Seems hard and whatever we come up with will limit runners, limit >>> efficiency, and potentially leak into users having to reason about >>> concurrency >>> >>> - Instead of building the variety of side input views on one primitive >>> multimap materialization, force runners to provide many primitive >>> materializations with consistency under the hood. Not hard to get started, >>> but adds an unfortunate new dimension for runners to vary in functionality >>> and performance, versus letting them optimize just one or a few >>> materializations >>> >>> - Have no consistency and just not support side input methods that >>> would require consistent metadata. I'm curious what features this will hurt. >>> >>> - Have no consistency but require the SDK to build some sort of large >>> value since single-element consistency is built in to the model always. >>> Today many runners do concatenate all elements into one value, though that >>> does not perform well. Making this effective probably requires new model >>> features. >>> >>> Kenn >>> >>> On Thu, Apr 11, 2019 at 9:44 AM Reuven Lax <[email protected]> wrote: >>> >>>> One thing to keep in mind: triggers that fire multiple times per window >>>> already tend to be non deterministic. These are element-count or >>>> processing-time triggers, both of which are fairly non deterministic in >>>> firing. >>>> >>>> Reuven >>>> >>>> On Thu, Apr 11, 2019 at 9:27 AM Lukasz Cwik <[email protected]> wrote: >>>> >>>>> Today, we define that a side input becomes available to be consumed >>>>> once at least one firing occurs or when the runner detects that no such >>>>> output could be produced (e.g. watermark is beyond the end of the window >>>>> when using the default trigger). For triggers that fire at most once, >>>>> consumers are guaranteed to have a consistent view of the contents of the >>>>> side input. But what happens when the trigger fire multiple times? >>>>> >>>>> Lets say we have a pipeline containing: >>>>> ParDo(A) --> PCollectionView S >>>>> \-> PCollectionView T >>>>> >>>>> ... >>>>> | >>>>> ParDo(C) <-(side input)- PCollectionView S and PCollectionView T >>>>> | >>>>> ... >>>>> >>>>> 1) Lets say ParDo(A) outputs (during a single bundle) X and Y to >>>>> PCollectionView S, should ParDo(C) see be guaranteed to see X only if it >>>>> can also see Y (and vice versa)? >>>>> >>>>> 2) Lets say ParDo(A) outputs (during a single bundle) X to >>>>> PCollectionView S and Y to PCollectionView T, should ParDo(C) be >>>>> guaranteed >>>>> to see X only if it can also see Y? >>>>> >>>> > > -- > ================ > Ruoyun Huang > >
