x-post from the associated Jira ticket[0]

Fortunately/unfortunately this same issue struck me as well, and I opened a
PR[1] to use `ProcessContext#output` rather than
`ProcessContext#outputWithTimestamp`.  I believe that should resolve this
issue, it has for me when running jobs with a vendored SDK with that change
included.  Do folks feel this change to be cherry-picked into 2.37.0?

The change also prompted a question to the mailing list[2] about skew
validation difference between ProcessContext vs FinishBundleContext (where
there is no ability to compute skew as I understand it).

[0] https://issues.apache.org/jira/browse/BEAM-14064

[1] https://github.com/apache/beam/pull/16744

[2] https://lists.apache.org/thread/33kj1yjmn6kkvpl4vz02vnfyn2bpzycp

On Mon, Mar 7, 2022 at 12:41 PM Luke Cwik <lc...@google.com> wrote:

> This is specifically a case where the @ProcessElement saw window X for
> element X0 and buffered it into memory and then when processing window Y
> and element Y0 wanted to flush previously buffered element X0. This all
> occurred as part of the same bundle.
>
> In general, yes, outputting to an earlier window is problematic.
>
> On Mon, Mar 7, 2022 at 9:32 AM Reuven Lax <re...@google.com> wrote:
>
>> Outputting to an earlier window is problematic,as the watermark can never
>> be correct if a DoFn can move time backwards arbitrarily.
>>
>> On Mon, Mar 7, 2022 at 9:01 AM Luke Cwik <lc...@google.com> wrote:
>>
>>> A good question would be should I be able to output to a different
>>> window from the current @ProcessElement call, like what we can do from
>>> @FinishBundle to handle these buffering scenarios.
>>>
>>> On Mon, Mar 7, 2022 at 8:53 AM Luke Cwik <lc...@google.com> wrote:
>>>
>>>> The issue is that ElasticsearchIO is collecting results from elements
>>>> in window X and then trying to output them in window Y when flushing the
>>>> batch. This exposed a bug where elements that were being buffered were
>>>> being output as part of a different window than what the window that
>>>> produced them was.
>>>>
>>>> This became visible because validation was added recently to ensure
>>>> that when the pipeline is processing elements in window X that output with
>>>> a timestamp is valid for window X. Note that this validation only occurs in
>>>> @ProcessElement since output is associated with the current window with the
>>>> input element that is being processed.
>>>>
>>>> It is ok to do this in @FinishBundle since there is no existing
>>>> windowing context and when you output that element is assigned to an
>>>> appropriate window.
>>>>
>>>> Filed https://issues.apache.org/jira/browse/BEAM-14064
>>>>
>>>> On Mon, Mar 7, 2022 at 7:44 AM Emils Solmanis <emils.solma...@rvu.co.uk>
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>> I think we’re hitting a regression in ElasticIO batch writing.
>>>>>
>>>>> We’ve bisected it to being introduced in 2.35.0, and I’m reasonably
>>>>> certain it’s this PR https://github.com/apache/beam/pull/15381
>>>>>
>>>>> Our scenario is pretty trivial, we read off Pubsub and write to
>>>>> Elastic in a streaming job, the config for the source and sink is
>>>>> respectively
>>>>>
>>>>> pipeline.apply(
>>>>>             PubsubIO.readStrings().fromSubscription(subscription)
>>>>>         ).apply(ParseJsons.of(OurObject::class.java))
>>>>>             .setCoder(KryoCoder.of())
>>>>>
>>>>> and
>>>>>
>>>>> ElasticsearchIO.write()
>>>>>             .withUseStatefulBatches(true)
>>>>>             .withMaxParallelRequestsPerWindow(1)
>>>>>             .withMaxBufferingDuration(Duration.standardSeconds(30))
>>>>>             // 5 bytes **> KiB **> MiB, so 5 MiB
>>>>>             .withMaxBatchSizeBytes(5L * 1024 * 1024)
>>>>>             // # of docs
>>>>>             .withMaxBatchSize(1000)
>>>>>             .withConnectionConfiguration(
>>>>>                 ElasticsearchIO.ConnectionConfiguration.create(
>>>>>                     arrayOf(host),
>>>>>                     "fubar",
>>>>>                     "_doc"
>>>>>                 ).withConnectTimeout(5000)
>>>>>                     .withSocketTimeout(30000)
>>>>>             )
>>>>>             .withRetryConfiguration(
>>>>>                 ElasticsearchIO.RetryConfiguration.create(
>>>>>                     10,
>>>>>                     // the duration is wall clock, against the connection 
>>>>> and socket timeouts specified
>>>>>                     // above. I.e., 10 x 30s is gonna be more than 3 
>>>>> minutes, so if we're getting
>>>>>                     // 10 socket timeouts in a row, this would ignore the 
>>>>> "10" part and terminate
>>>>>                     // after 6. The idea is that in a mixed failure mode, 
>>>>> you'd get different timeouts
>>>>>                     // of different durations, and on average 10 x fails 
>>>>> < 4m.
>>>>>                     // That said, 4m is arbitrary, so adjust as and when 
>>>>> needed.
>>>>>                     Duration.standardMinutes(4)
>>>>>                 )
>>>>>             )
>>>>>             .withIdFn { f: JsonNode -> f["id"].asText() }
>>>>>             .withIndexFn { f: JsonNode -> f["schema_name"].asText() }
>>>>>             .withIsDeleteFn { f: JsonNode -> f["_action"].asText("noop") 
>>>>> == "delete" }
>>>>>
>>>>> We recently tried upgrading 2.33 to 2.36 and immediately hit a bug in
>>>>> the consumer, due to alleged time skew, specifically
>>>>>
>>>>> 2022-03-07 10:48:37.886 GMTError message from worker: 
>>>>> java.lang.IllegalArgumentException: Cannot output with timestamp 
>>>>> 2022-03-07T10:43:38.640Z. Output timestamps must be no earlier than the 
>>>>> timestamp of the
>>>>> current input (2022-03-07T10:43:43.562Z) minus the allowed skew (0 
>>>>> milliseconds) and no later than 294247-01-10T04:00:54.775Z. See the 
>>>>> DoFn#getAllowedTimestampSkew() Javadoc
>>>>> for details on changing the allowed skew.
>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.checkTimestamp(SimpleDoFnRunner.java:446)
>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:422)
>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn$ProcessContextAdapter.output(ElasticsearchIO.java:2364)
>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.flushAndOutputResults(ElasticsearchIO.java:2404)
>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.addAndMaybeFlush(ElasticsearchIO.java:2419)
>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOStatefulFn.processElement(ElasticsearchIO.java:2300)
>>>>>
>>>>> I’ve bisected it and 2.34 works fine, 2.35 is the first version this
>>>>> breaks, and it seems like the code in the trace is largely added by the PR
>>>>> linked above. The error usually claims a skew of a few seconds, but
>>>>> obviously I can’t override getAllowedTimestampSkew() on the internal
>>>>> Elastic DoFn, and it’s marked deprecated anyway.
>>>>>
>>>>> I’m happy to raise a JIRA but I’m not 100% sure what the code was
>>>>> intending to fix, and additionally, I’d also be happy if someone else can
>>>>> reproduce this or knows of similar reports. I feel like what we’re doing 
>>>>> is
>>>>> not *that* uncommon a scenario, so I would have thought someone else
>>>>> would have hit this by now.
>>>>>
>>>>> Best,
>>>>> Emils
>>>>>
>>>>

Reply via email to