On Sat, Jan 4, 2020 at 11:03 AM Jan Lukavský <[email protected]> wrote:
> On 1/4/20 6:14 PM, Reuven Lax wrote: > > There is a very good reason not to define lateness directly in terms of > the watermark. The model does not make any guarantees that the watermark > advances synchronously, and in fact for the Dataflow runner the watermark > advances asynchronously (i.e. independent of element processing). This > means that simply comparing an element timestamp against the watermark > creates a race condition. There are cases where the answer could change > depending on exactly when you examine the watermark, and if you examine > again while processing the same bundle you might come to a different > conclusion about lateness. > > Due to monotonicity of watermark, I don't think that the asynchronous > updates of watermark can change the answer from "late" to "not late". That > seems fine to me. > It's the other way around. You check to see whether an element is late and the answer is "not late." An instant later the answer changes to "late" This does cause many problems, and is why this was changed. > > This non determinism is undesirable when considering lateness, as it can > break many invariants that users may rely on (e.g. if I could write a ParDo > that filtered all late data, yet still find late data showing up downstream > of the ParDo which would be very surprising). For that reason, the SDK > always marks things as late based on deterministic signals. e.g. for a > triggered GBK everything in the first post-watermark pane is marked as on > time (no matter what the watermark is) and everything in subsequent panes > is marked as late. > > Dropping latecomers will always be non-deterministic, that is certain. > This is true even in case where watermark is updated synchronously with > element processing, due to shuffling and varying (random) differences of > processing and event time in upstream operator(s). The question was only if > a latecomer should be dropped only at a window boundaries only (which is a > sort of artificial time boundary), or right away when spotted (in stateful > dofns only). Another question would be if latecomers should be dropped > based on input or output watermark, dropping based on output watermark > seems even to be stable in the sense, that all downstream operators should > come to the same conclusion (this is a bit of a speculation). > Yes, but invariants should hold. If I add a ParDo that drops late elements (or, more commonly,diverts the late elements to a different PCollection), then the result of that ParDo should _never_ introduce and more late data. This cannot be guaranteed simply with watermark checks. The ParDo may decide that the element was not late, but by the time it outputs the element the watermark may have advanced, causing the element to actually be late. In practice this is important. And early version of Dataflow (pre Beam) implemented lateness by comparing against the watermark, and it caused no end of trouble for users. > > FYI - this is also the reason why Beam does not currently provide users > direct access to the watermark. The asynchronous nature of it can be very > confusing, and often results in users writing bugs in their pipelines. We > decided instead to expose easier-to-reason-about signals such as timers > (triggered by the watermark), windows, and lateness. > > Reuven > > On Sat, Jan 4, 2020 at 1:15 AM Jan Lukavský <[email protected]> wrote: > >> I realized the problem. I misinterpreted the LateDataDroppingDoFnRunner. >> It doesn't drop *all* late (arriving after watermark - allowed lateness) >> data, but only data, that arrive after maxTimestamp + allowedLateness of >> their respective windows. >> >> Stateful DoFn can run on global window (which was the case of my tests) >> and there is no dropping then. >> >> Two questions arise then: >> >> a) does it mean that this is one more argument to move this logic to >> StatefulDoFnRunner? StatefulDoFnRunner performs state cleanup on window GC >> time, so without LateDataDroppingDoFnRunner and late data will see empty >> state and will produce wrong results. >> >> b) is this behavior generally intentional and correct? Windows and >> triggers are (in my point of view) features of GBK, not stateful DoFn. >> Stateful DoFn is a low level primitive, which can be viewed to operate on >> "instant" windows, which should then probably be defined as dropping every >> single element arrive after allowed lateness. This might probably relate to >> question if operations should be built bottom up from most primitive and >> generic ones to more specific ones - that is GBK be implemented on top of >> stateful DoFn and not vice versa. >> >> Thoughts? >> >> Jan >> On 1/4/20 1:03 AM, Steve Niemitz wrote: >> >> I do agree that the direct runner doesn't drop late data arriving at a >> stateful DoFn (I just tested as well). >> >> However, I believe this is consistent with other runners. I'm fairly >> certain (at least last time I checked) that at least Dataflow will also >> only drop late data at GBK operations, and NOT stateful DoFns. Whether or >> not this is intentional is debatable however, without being able to inspect >> the watermark inside the stateful DoFn, it'd be very difficult to do >> anything useful with late data. >> >> >> On Fri, Jan 3, 2020 at 5:47 PM Jan Lukavský <[email protected]> wrote: >> >>> I did write a test that tested if data is dropped in a plain stateful >>> DoFn. I did this as part of validating that PR [1] didn't drop more data >>> when using @RequiresTimeSortedInput than it would without this annotation. >>> This test failed and I didn't commit it, yet. >>> >>> The test was basically as follows: >>> >>> - use TestStream to generate three elements with timestamps 2, 1 and 0 >>> >>> - between elements with timestamp 1 and 0 move watermark to 1 >>> >>> - use allowed lateness of zero >>> >>> - use stateful dofn that just emits arbitrary data for each input >>> element >>> >>> - use Count.globally to count outputs >>> >>> The outcome was that stateful dofn using @RequiresTimeSortedInput output >>> 2 elements, without the annotation it was 3 elements. I think the correct >>> one would be 2 elements in this case. The difference is caused by the >>> annotation having (currently) its own logic for dropping data, which could >>> be removed if we agree, that the data should be dropped in all cases. >>> On 1/3/20 11:23 PM, Kenneth Knowles wrote: >>> >>> Did you write such a @Category(ValidatesRunner.class) test? I believe >>> the Java direct runner does drop late data, for both GBK and stateful >>> ParDo. >>> >>> Stateful ParDo is implemented on top of GBK: >>> https://github.com/apache/beam/blob/64262a61402fad67d9ad8a66eaf6322593d3b5dc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java#L172 >>> >>> And GroupByKey, via DirectGroupByKey, via DirectGroupAlsoByWindow, does >>> drop late data: >>> https://github.com/apache/beam/blob/c2f0d282337f3ae0196a7717712396a5a41fdde1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java#L220 >>> >>> I'm not sure why it has its own code, since ReduceFnRunner also drops >>> late data, and it does use ReduceFnRunner (the same code path all >>> Java-based runners use). >>> >>> Kenn >>> >>> >>> On Fri, Jan 3, 2020 at 1:02 PM Jan Lukavský <[email protected]> wrote: >>> >>>> Yes, the non-reliability of late data dropping in distributed runner is >>>> understood. But this is even where DirectRunner can play its role, because >>>> only there it is actually possible to emulate and test specific watermark >>>> conditions. Question regarding this for the java DirectRunner - should we >>>> completely drop LataDataDroppingDoFnRunner and delegate the late data >>>> dropping to StatefulDoFnRunner? Seems logical to me, as if we agree that >>>> late data should always be dropped, then there would no "valid" use of >>>> StatefulDoFnRunner without the late data dropping functionality. >>>> On 1/3/20 9:32 PM, Robert Bradshaw wrote: >>>> >>>> I agree, in fact we just recently enabled late data dropping to the >>>> direct runner in Python to be able to develop better tests for Dataflow. >>>> >>>> It should be noted, however, that in a distributed runner (absent the >>>> quiessence of TestStream) that one can't *count* on late data being dropped >>>> at a certain point, and in fact (due to delays in fully propagating the >>>> watermark) late data can even become on-time, so the promises about what >>>> happens behind the watermark are necessarily a bit loose. >>>> >>>> On Fri, Jan 3, 2020 at 9:15 AM Luke Cwik <[email protected]> wrote: >>>> >>>>> I agree that the DirectRunner should drop late data. Late data >>>>> dropping is optional but the DirectRunner is used by many for testing and >>>>> we should have the same behaviour they would get on other runners or users >>>>> may be surprised. >>>>> >>>>> On Fri, Jan 3, 2020 at 3:33 AM Jan Lukavský <[email protected]> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> I just found out that DirectRunner is apparently not using >>>>>> LateDataDroppingDoFnRunner, which means that it doesn't drop late >>>>>> data >>>>>> in cases where there is no GBK operation involved (dropping in GBK >>>>>> seems >>>>>> to be correct). There is apparently no @Category(ValidatesRunner) >>>>>> test >>>>>> for that behavior (because DirectRunner would fail it), so the >>>>>> question >>>>>> is - should late data dropping be considered part of model (of which >>>>>> DirectRunner should be a canonical implementation) and therefore that >>>>>> should be fixed there, or is the late data dropping an optional >>>>>> feature >>>>>> of a runner? >>>>>> >>>>>> I'm strongly in favor of the first option, and I think it is likely >>>>>> that >>>>>> all real-world runners would probably adhere to that (I didn't check >>>>>> that, though). >>>>>> >>>>>> Opinions? >>>>>> >>>>>> Jan >>>>>> >>>>>>
