> > But I'm not sure if an IO throwing away windows (in favour of Global) > would be acceptable either. BigQueryIO#write (streaming) also has a notion > of "getFailedWrites" with the same intent as what's in ElasticsearchIO; I > wonder how that's implemented? >
It appears that BigQueryIO[1] just re-windows inputs into the global window and does not concern itself with preserving input windows of elements nor holding the watermark. Is that a good precedent to follow? It would certainly simplify the implementation of ElasticsearchIO#bulkIO. [1] https://github.com/apache/beam/blob/a126adbc6aa73f1e30adfa65a3710f7f69a7ba89/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java#L313-L318 On Wed, Mar 9, 2022 at 10:13 AM Evan Galpin <evan.gal...@gmail.com> wrote: > Oops, unfortunate misfire before being done writing my prior email. > > Thanks Jan for the thoughtful responses! It definitely sounds as though > the ElasticsearchIO#write transform would be safer to either hold the > watermark via similar machinery employed by GroupIntoBatches or possibly > output everything into a global window; either sounds more safe in terms of > data correctness than potentially outputting "on time" data as "late" > data. But I'm not sure if an IO throwing away windows (in favour of > Global) would be acceptable either. BigQueryIO#write (streaming) also has > a notion of "getFailedWrites" with the same intent as what's in > ElasticsearchIO; I wonder how that's implemented? > > I'm keen to follow any further discussion on watermark holds for DoFn's > that implement both @StartBundle and @FinishBundle! > > - Evan > > On Wed, Mar 9, 2022 at 10:05 AM Evan Galpin <evan.gal...@gmail.com> wrote: > >> Got it. So ProcessContext#ouput has the semantics of "using the window >> associated with the current @Element, output the data being passed in into >> that window." Makes complete sense. >> >> On Wed, Mar 9, 2022 at 4:15 AM Jan Lukavský <je...@seznam.cz> wrote: >> >>> Interesting discussion. :-) >>> >>> Answers inline. >>> On 3/8/22 22:00, Evan Galpin wrote: >>> >>> Thanks Jan for confirming that the fix looks alright. I also found a >>> PR[1] that appears to be a good case study of the Timer watermark hold >>> technique that you previously mentioned so I'll study that a bit for my own >>> understanding and future use. I was also previously missing the notion >>> that a singular bundle could contain elements from many disparate Windows, >>> so that's a great upgrade to my own mental model haha. >>> >>> I have a few follow-up questions for my own understanding (hopefully not >>> off topic). >>> >>> 1. Am I understanding correctly that using ProcessContext#output to >>> output a given element will preserve the element's input window(s)? Or is >>> it the case that when using ProcessContext#output, any buffered elements >>> will all be output to whatever window the most recent element processed >>> by @ProcessElement belongs to? >>> >>> There is no way Beam can distinguish *how* you produced elements emitted >>> from @ProcessElement (if those are result of some in-memory buffering or >>> direct result of computation of the single input element currently being >>> processed). From that it follows, that window(s) associated with the output >>> will be equal to window(s) of the input element currently processed >>> (generally, the model allows multiple applications of window function, >>> which is why window.maxTimestamp is 1 ms shifted backwards, to be still >>> part of the window itself, so that the window functions based on timestamps >>> are idempotent). >>> >>> >>> 1. Is it safe to emit outputs from @FinishBundle to windows which >>> may be older than the watermark? I believe this could still be happening >>> given the current ElasticsearchIO implementation where any buffered >>> elements are output in @FinishBundle using the same window they were >>> associated with on input. Intuitively, that sounds as though it could >>> be a >>> correctness bug if the watermark had progressed beyond those windows. >>> >>> This is great question that probably deserves deeper attention. >>> >>> a) first of all - the fact, that you potentially could output late >>> data, that previously were not late is _potentially_ a correctness bug, if >>> you have any downstream logic, that performs any additional grouping and - >>> most notably - relies on causality being preserved (which is how our >>> universe works, so this is a natural requirement). Changing element from >>> "on time" to "late" can result in downstream processing see effect prior to >>> cause (in event time!, because the watermark move can cause downstream >>> timers to fire). I wrote something about this in [1]. Generally, if there >>> is no grouping (or your application logic accounts for the swapping of >>> cause and effect), then this is not an issue. If you produce a PCollection >>> for user, then you don't know the user does with it and therefore should >>> pay attention to it. >>> >>> b) second, this effect would not appear if mid-bundle output watermarks >>> updates were not possible. I emphasize that I don't know if this is allowed >>> in the model, but I *suppose* it is (@kenn, @reuven or @luke can you >>> correct me if I'm wrong?). I have some doubts if it is correct, though. It >>> seems that it causes generally issues with in-memory buffering and >>> 'outputWithTimestamp' in stateless DoFns. If a DoFn implements @StartBundle >>> and @FinishBundle it seems, that we should hold output watermark between >>> these two calls. But that would turn stateless DoFn into stateful, which is >>> ... unfortunate. :) >>> >>> I hope I made things a little more clear rather than more obfuscated. :) >>> >>> Jan >>> >>> [1] https://twitter.com/janl_apache/status/1478757956263071745 >>> >>> >>> It's definitely interesting to think about the idea of enforcing >>> watermark update only between bundles, but presumably that would mean quite >>> extensive alterations. >>> >>> - Evan >>> >>> [1] https://github.com/apache/beam/pull/15249 >>> >>> On Tue, Mar 8, 2022 at 10:29 AM Jan Lukavský <je...@seznam.cz> wrote: >>> >>>> Hi Evan, >>>> >>>> the fix looks good to me, as long as the timestamp of the buffered data >>>> need to be preserved downstream. Generally I think it *should* be possible >>>> to output in-memory buffered data in @ProcessElement (and @FinishBundle), >>>> the case where you need timers is when your buffer needs to span multiple >>>> bundles. It that case it also must be stored in regular state and not >>>> in-memory. >>>> >>>> It seems to me that there is some logical gap in how we handle the >>>> per-bundle buffering. I see two possibilities: >>>> >>>> a) either we allow in the model to change the output watermark *while* >>>> processing a bundle, - in which case it is logical requirement to have >>>> output timestamp from @ProcessElement no earlier than timestamp of the >>>> current element (because that way we preserve the "on time", "late" status >>>> of the current element, we don't swap anything), or >>>> >>>> b) we enforce output watermark update only in-between of bundles - in >>>> that case the requirement could be relaxed that the output timestamp from >>>> @ProcessElement might be no earlier than the minimum of timestamps inside >>>> the bundle >>>> >>>> I'm afraid that our current position is a). But in that case it is >>>> somewhat questionable if it is semantically correct to use >>>> outputWithTimestamp() in @ProcessElement of stateless DoFn at all. It can >>>> move timestamps only to future instant (and inside same window!), which has >>>> little semantic meaning to me. Looks more error prone than useful. >>>> >>>> Jan >>>> On 3/8/22 15:53, Evan Galpin wrote: >>>> >>>> Thanks Jan, it's interesting to read about the handling of timestamp in >>>> cases employing a buffering pattern. In the case of the ES write >>>> transform, buffered data could be output from ProcessElement or >>>> FinishBundle. It's the case where data is output from ProcessElement that >>>> the error reported at the start of this thread shows up. Based on what you >>>> described, it sounds like the PR[1] I made to "fix" this issue is actually >>>> fixing it by transitioning data from being late to being on time and >>>> outputting all buffered data into a non-deterministic window where the >>>> output heuristic is later satisfied (number of elements buffered or max >>>> time since last output). It seems there's 2 issues as a result: >>>> >>>> 1. The window to which input elements belong is not being preserved. >>>> Presumably we want IOs to leave an element's window unaltered? >>>> 2. The watermark of the system might be incorrect given that buffered >>>> data is assumed processed and allows the watermark to pass? >>>> >>>> It would definitely add complexity to the IO to employ timers to >>>> address this, but if that's the preferred or only solution I'll put some >>>> thought into how to implement that solution. >>>> >>>> Thanks, >>>> Evan >>>> >>>> [1] https://github.com/apache/beam/pull/16744 >>>> >>>> On Tue, Mar 8, 2022 at 3:47 AM Jan Lukavský <je...@seznam.cz> wrote: >>>> >>>>> Ah, sorry, the code flushes in @FinishBundle. Is it allowed to update >>>>> output watermark while a bundle is being processed? That seems it could >>>>> also cause the "watermark skip" problem, which is definitely an issue (and >>>>> is probably the reason why the check fails?). >>>>> On 3/8/22 09:35, Jan Lukavský wrote: >>>>> >>>>> The buffering seems incorrect to me. Whenever there is a buffer, we >>>>> need to make sure we hold the output watermark, otherwise the watermark >>>>> might "jump over" a buffered element transitioning it from "on-time" to >>>>> "late", which would be a correctness bug (we can transition elements only >>>>> from "late" to "on-time", never the other way around). The alternative is >>>>> to use @FinishBundle to do the flushing, but might not be appropriate >>>>> here. >>>>> >>>>> Currently, the only way to limit the progress of output watermark is >>>>> by setting a timer with output timestamp that has the timestamp of the >>>>> earliest element in the buffer. There was a thread that was discussing >>>>> this >>>>> in more details [1]. >>>>> >>>>> Jan >>>>> >>>>> [1] https://lists.apache.org/thread/y4n5rnhfmh47orhhr92og2r0plvgopl8 >>>>> On 3/7/22 19:54, Evan Galpin wrote: >>>>> >>>>> In general, I'd also really like to improve my understanding and learn >>>>> more about how the employed buffering can cause this skew. Is it because >>>>> the call to "flush" is happening from a different "current window" than >>>>> the >>>>> elements were originally buffered from? I'm actually thinking that the >>>>> PR[1] to "fix" this would have had the side effect of outputting buffered >>>>> elements into the window from which "flush" was called rather than the >>>>> window from which the buffered data originated. I suppose that could be >>>>> problematic, but should at least satisfy the validation code. >>>>> >>>>> [1] https://github.com/apache/beam/pull/16744 >>>>> >>>>> On Mon, Mar 7, 2022 at 1:39 PM Evan Galpin <evan.gal...@gmail.com> >>>>> wrote: >>>>> >>>>>> x-post from the associated Jira ticket[0] >>>>>> >>>>>> >>>>>> Fortunately/unfortunately this same issue struck me as well, and I >>>>>> opened a PR[1] to use `ProcessContext#output` rather than >>>>>> `ProcessContext#outputWithTimestamp`. I believe that should resolve this >>>>>> issue, it has for me when running jobs with a vendored SDK with that >>>>>> change >>>>>> included. Do folks feel this change to be cherry-picked into 2.37.0? >>>>>> >>>>>> The change also prompted a question to the mailing list[2] about >>>>>> skew validation difference between ProcessContext vs FinishBundleContext >>>>>> (where there is no ability to compute skew as I understand it). >>>>>> >>>>>> [0] https://issues.apache.org/jira/browse/BEAM-14064 >>>>>> >>>>>> [1] https://github.com/apache/beam/pull/16744 >>>>>> >>>>>> [2] https://lists.apache.org/thread/33kj1yjmn6kkvpl4vz02vnfyn2bpzycp >>>>>> >>>>>> On Mon, Mar 7, 2022 at 12:41 PM Luke Cwik <lc...@google.com> wrote: >>>>>> >>>>>>> This is specifically a case where the @ProcessElement saw window X >>>>>>> for element X0 and buffered it into memory and then when processing >>>>>>> window >>>>>>> Y and element Y0 wanted to flush previously buffered element X0. This >>>>>>> all >>>>>>> occurred as part of the same bundle. >>>>>>> >>>>>>> In general, yes, outputting to an earlier window is problematic. >>>>>>> >>>>>>> On Mon, Mar 7, 2022 at 9:32 AM Reuven Lax <re...@google.com> wrote: >>>>>>> >>>>>>>> Outputting to an earlier window is problematic,as the watermark can >>>>>>>> never be correct if a DoFn can move time backwards arbitrarily. >>>>>>>> >>>>>>>> On Mon, Mar 7, 2022 at 9:01 AM Luke Cwik <lc...@google.com> wrote: >>>>>>>> >>>>>>>>> A good question would be should I be able to output to a different >>>>>>>>> window from the current @ProcessElement call, like what we can do from >>>>>>>>> @FinishBundle to handle these buffering scenarios. >>>>>>>>> >>>>>>>>> On Mon, Mar 7, 2022 at 8:53 AM Luke Cwik <lc...@google.com> wrote: >>>>>>>>> >>>>>>>>>> The issue is that ElasticsearchIO is collecting results from >>>>>>>>>> elements in window X and then trying to output them in window Y when >>>>>>>>>> flushing the batch. This exposed a bug where elements that were being >>>>>>>>>> buffered were being output as part of a different window than what >>>>>>>>>> the >>>>>>>>>> window that produced them was. >>>>>>>>>> >>>>>>>>>> This became visible because validation was added recently to >>>>>>>>>> ensure that when the pipeline is processing elements in window X that >>>>>>>>>> output with a timestamp is valid for window X. Note that this >>>>>>>>>> validation >>>>>>>>>> only occurs in @ProcessElement since output is associated with the >>>>>>>>>> current >>>>>>>>>> window with the input element that is being processed. >>>>>>>>>> >>>>>>>>>> It is ok to do this in @FinishBundle since there is no existing >>>>>>>>>> windowing context and when you output that element is assigned to an >>>>>>>>>> appropriate window. >>>>>>>>>> >>>>>>>>>> Filed https://issues.apache.org/jira/browse/BEAM-14064 >>>>>>>>>> >>>>>>>>>> On Mon, Mar 7, 2022 at 7:44 AM Emils Solmanis < >>>>>>>>>> emils.solma...@rvu.co.uk> wrote: >>>>>>>>>> >>>>>>>>>>> Hi all, >>>>>>>>>>> I think we’re hitting a regression in ElasticIO batch writing. >>>>>>>>>>> >>>>>>>>>>> We’ve bisected it to being introduced in 2.35.0, and I’m >>>>>>>>>>> reasonably certain it’s this PR >>>>>>>>>>> https://github.com/apache/beam/pull/15381 >>>>>>>>>>> >>>>>>>>>>> Our scenario is pretty trivial, we read off Pubsub and write to >>>>>>>>>>> Elastic in a streaming job, the config for the source and sink is >>>>>>>>>>> respectively >>>>>>>>>>> >>>>>>>>>>> pipeline.apply( >>>>>>>>>>> PubsubIO.readStrings().fromSubscription(subscription) >>>>>>>>>>> ).apply(ParseJsons.of(OurObject::class.java)) >>>>>>>>>>> .setCoder(KryoCoder.of()) >>>>>>>>>>> >>>>>>>>>>> and >>>>>>>>>>> >>>>>>>>>>> ElasticsearchIO.write() >>>>>>>>>>> .withUseStatefulBatches(true) >>>>>>>>>>> .withMaxParallelRequestsPerWindow(1) >>>>>>>>>>> .withMaxBufferingDuration(Duration.standardSeconds(30)) >>>>>>>>>>> // 5 bytes **> KiB **> MiB, so 5 MiB >>>>>>>>>>> .withMaxBatchSizeBytes(5L * 1024 * 1024) >>>>>>>>>>> // # of docs >>>>>>>>>>> .withMaxBatchSize(1000) >>>>>>>>>>> .withConnectionConfiguration( >>>>>>>>>>> ElasticsearchIO.ConnectionConfiguration.create( >>>>>>>>>>> arrayOf(host), >>>>>>>>>>> "fubar", >>>>>>>>>>> "_doc" >>>>>>>>>>> ).withConnectTimeout(5000) >>>>>>>>>>> .withSocketTimeout(30000) >>>>>>>>>>> ) >>>>>>>>>>> .withRetryConfiguration( >>>>>>>>>>> ElasticsearchIO.RetryConfiguration.create( >>>>>>>>>>> 10, >>>>>>>>>>> // the duration is wall clock, against the >>>>>>>>>>> connection and socket timeouts specified >>>>>>>>>>> // above. I.e., 10 x 30s is gonna be more than >>>>>>>>>>> 3 minutes, so if we're getting >>>>>>>>>>> // 10 socket timeouts in a row, this would >>>>>>>>>>> ignore the "10" part and terminate >>>>>>>>>>> // after 6. The idea is that in a mixed failure >>>>>>>>>>> mode, you'd get different timeouts >>>>>>>>>>> // of different durations, and on average 10 x >>>>>>>>>>> fails < 4m. >>>>>>>>>>> // That said, 4m is arbitrary, so adjust as and >>>>>>>>>>> when needed. >>>>>>>>>>> Duration.standardMinutes(4) >>>>>>>>>>> ) >>>>>>>>>>> ) >>>>>>>>>>> .withIdFn { f: JsonNode -> f["id"].asText() } >>>>>>>>>>> .withIndexFn { f: JsonNode -> f["schema_name"].asText() >>>>>>>>>>> } >>>>>>>>>>> .withIsDeleteFn { f: JsonNode -> >>>>>>>>>>> f["_action"].asText("noop") == "delete" } >>>>>>>>>>> >>>>>>>>>>> We recently tried upgrading 2.33 to 2.36 and immediately hit a >>>>>>>>>>> bug in the consumer, due to alleged time skew, specifically >>>>>>>>>>> >>>>>>>>>>> 2022-03-07 10:48:37.886 GMTError message from worker: >>>>>>>>>>> java.lang.IllegalArgumentException: Cannot output with timestamp >>>>>>>>>>> 2022-03-07T10:43:38.640Z. Output timestamps must be no earlier than >>>>>>>>>>> the timestamp of the >>>>>>>>>>> current input (2022-03-07T10:43:43.562Z) minus the allowed skew (0 >>>>>>>>>>> milliseconds) and no later than 294247-01-10T04:00:54.775Z. See the >>>>>>>>>>> DoFn#getAllowedTimestampSkew() Javadoc >>>>>>>>>>> for details on changing the allowed skew. >>>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.checkTimestamp(SimpleDoFnRunner.java:446) >>>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:422) >>>>>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn$ProcessContextAdapter.output(ElasticsearchIO.java:2364) >>>>>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.flushAndOutputResults(ElasticsearchIO.java:2404) >>>>>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.addAndMaybeFlush(ElasticsearchIO.java:2419) >>>>>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOStatefulFn.processElement(ElasticsearchIO.java:2300) >>>>>>>>>>> >>>>>>>>>>> I’ve bisected it and 2.34 works fine, 2.35 is the first version >>>>>>>>>>> this breaks, and it seems like the code in the trace is largely >>>>>>>>>>> added by >>>>>>>>>>> the PR linked above. The error usually claims a skew of a few >>>>>>>>>>> seconds, but >>>>>>>>>>> obviously I can’t override getAllowedTimestampSkew() on the >>>>>>>>>>> internal Elastic DoFn, and it’s marked deprecated anyway. >>>>>>>>>>> >>>>>>>>>>> I’m happy to raise a JIRA but I’m not 100% sure what the code >>>>>>>>>>> was intending to fix, and additionally, I’d also be happy if >>>>>>>>>>> someone else >>>>>>>>>>> can reproduce this or knows of similar reports. I feel like what >>>>>>>>>>> we’re >>>>>>>>>>> doing is not *that* uncommon a scenario, so I would have >>>>>>>>>>> thought someone else would have hit this by now. >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Emils >>>>>>>>>>> >>>>>>>>>>