x-post from the associated Jira ticket[0]
Fortunately/unfortunately this same issue struck me as well, and I opened a PR[1] to use `ProcessContext#output` rather than `ProcessContext#outputWithTimestamp`. I believe that should resolve this issue, it has for me when running jobs with a vendored SDK with that change included. Do folks feel this change to be cherry-picked into 2.37.0? The change also prompted a question to the mailing list[2] about skew validation difference between ProcessContext vs FinishBundleContext (where there is no ability to compute skew as I understand it). [0] https://issues.apache.org/jira/browse/BEAM-14064 [1] https://github.com/apache/beam/pull/16744 [2] https://lists.apache.org/thread/33kj1yjmn6kkvpl4vz02vnfyn2bpzycp On Mon, Mar 7, 2022 at 12:41 PM Luke Cwik <lc...@google.com> wrote: > This is specifically a case where the @ProcessElement saw window X for > element X0 and buffered it into memory and then when processing window Y > and element Y0 wanted to flush previously buffered element X0. This all > occurred as part of the same bundle. > > In general, yes, outputting to an earlier window is problematic. > > On Mon, Mar 7, 2022 at 9:32 AM Reuven Lax <re...@google.com> wrote: > >> Outputting to an earlier window is problematic,as the watermark can never >> be correct if a DoFn can move time backwards arbitrarily. >> >> On Mon, Mar 7, 2022 at 9:01 AM Luke Cwik <lc...@google.com> wrote: >> >>> A good question would be should I be able to output to a different >>> window from the current @ProcessElement call, like what we can do from >>> @FinishBundle to handle these buffering scenarios. >>> >>> On Mon, Mar 7, 2022 at 8:53 AM Luke Cwik <lc...@google.com> wrote: >>> >>>> The issue is that ElasticsearchIO is collecting results from elements >>>> in window X and then trying to output them in window Y when flushing the >>>> batch. This exposed a bug where elements that were being buffered were >>>> being output as part of a different window than what the window that >>>> produced them was. >>>> >>>> This became visible because validation was added recently to ensure >>>> that when the pipeline is processing elements in window X that output with >>>> a timestamp is valid for window X. Note that this validation only occurs in >>>> @ProcessElement since output is associated with the current window with the >>>> input element that is being processed. >>>> >>>> It is ok to do this in @FinishBundle since there is no existing >>>> windowing context and when you output that element is assigned to an >>>> appropriate window. >>>> >>>> Filed https://issues.apache.org/jira/browse/BEAM-14064 >>>> >>>> On Mon, Mar 7, 2022 at 7:44 AM Emils Solmanis <emils.solma...@rvu.co.uk> >>>> wrote: >>>> >>>>> Hi all, >>>>> I think we’re hitting a regression in ElasticIO batch writing. >>>>> >>>>> We’ve bisected it to being introduced in 2.35.0, and I’m reasonably >>>>> certain it’s this PR https://github.com/apache/beam/pull/15381 >>>>> >>>>> Our scenario is pretty trivial, we read off Pubsub and write to >>>>> Elastic in a streaming job, the config for the source and sink is >>>>> respectively >>>>> >>>>> pipeline.apply( >>>>> PubsubIO.readStrings().fromSubscription(subscription) >>>>> ).apply(ParseJsons.of(OurObject::class.java)) >>>>> .setCoder(KryoCoder.of()) >>>>> >>>>> and >>>>> >>>>> ElasticsearchIO.write() >>>>> .withUseStatefulBatches(true) >>>>> .withMaxParallelRequestsPerWindow(1) >>>>> .withMaxBufferingDuration(Duration.standardSeconds(30)) >>>>> // 5 bytes **> KiB **> MiB, so 5 MiB >>>>> .withMaxBatchSizeBytes(5L * 1024 * 1024) >>>>> // # of docs >>>>> .withMaxBatchSize(1000) >>>>> .withConnectionConfiguration( >>>>> ElasticsearchIO.ConnectionConfiguration.create( >>>>> arrayOf(host), >>>>> "fubar", >>>>> "_doc" >>>>> ).withConnectTimeout(5000) >>>>> .withSocketTimeout(30000) >>>>> ) >>>>> .withRetryConfiguration( >>>>> ElasticsearchIO.RetryConfiguration.create( >>>>> 10, >>>>> // the duration is wall clock, against the connection >>>>> and socket timeouts specified >>>>> // above. I.e., 10 x 30s is gonna be more than 3 >>>>> minutes, so if we're getting >>>>> // 10 socket timeouts in a row, this would ignore the >>>>> "10" part and terminate >>>>> // after 6. The idea is that in a mixed failure mode, >>>>> you'd get different timeouts >>>>> // of different durations, and on average 10 x fails >>>>> < 4m. >>>>> // That said, 4m is arbitrary, so adjust as and when >>>>> needed. >>>>> Duration.standardMinutes(4) >>>>> ) >>>>> ) >>>>> .withIdFn { f: JsonNode -> f["id"].asText() } >>>>> .withIndexFn { f: JsonNode -> f["schema_name"].asText() } >>>>> .withIsDeleteFn { f: JsonNode -> f["_action"].asText("noop") >>>>> == "delete" } >>>>> >>>>> We recently tried upgrading 2.33 to 2.36 and immediately hit a bug in >>>>> the consumer, due to alleged time skew, specifically >>>>> >>>>> 2022-03-07 10:48:37.886 GMTError message from worker: >>>>> java.lang.IllegalArgumentException: Cannot output with timestamp >>>>> 2022-03-07T10:43:38.640Z. Output timestamps must be no earlier than the >>>>> timestamp of the >>>>> current input (2022-03-07T10:43:43.562Z) minus the allowed skew (0 >>>>> milliseconds) and no later than 294247-01-10T04:00:54.775Z. See the >>>>> DoFn#getAllowedTimestampSkew() Javadoc >>>>> for details on changing the allowed skew. >>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.checkTimestamp(SimpleDoFnRunner.java:446) >>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:422) >>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn$ProcessContextAdapter.output(ElasticsearchIO.java:2364) >>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.flushAndOutputResults(ElasticsearchIO.java:2404) >>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.addAndMaybeFlush(ElasticsearchIO.java:2419) >>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOStatefulFn.processElement(ElasticsearchIO.java:2300) >>>>> >>>>> I’ve bisected it and 2.34 works fine, 2.35 is the first version this >>>>> breaks, and it seems like the code in the trace is largely added by the PR >>>>> linked above. The error usually claims a skew of a few seconds, but >>>>> obviously I can’t override getAllowedTimestampSkew() on the internal >>>>> Elastic DoFn, and it’s marked deprecated anyway. >>>>> >>>>> I’m happy to raise a JIRA but I’m not 100% sure what the code was >>>>> intending to fix, and additionally, I’d also be happy if someone else can >>>>> reproduce this or knows of similar reports. I feel like what we’re doing >>>>> is >>>>> not *that* uncommon a scenario, so I would have thought someone else >>>>> would have hit this by now. >>>>> >>>>> Best, >>>>> Emils >>>>> >>>>