Thanks Jan, it's interesting to read about the handling of timestamp in
cases employing a buffering pattern.  In the case of the ES write
transform, buffered data could be output from ProcessElement or
FinishBundle.  It's the case where data is output from ProcessElement that
the error reported at the start of this thread shows up.  Based on what you
described, it sounds like the PR[1] I made to "fix" this issue is actually
fixing it by transitioning data from being late to being on time and
outputting all buffered data into a non-deterministic window where the
output heuristic is later satisfied (number of elements buffered or max
time since last output).  It seems there's 2 issues as a result:

1. The window to which input elements belong is not being preserved.
Presumably we want IOs to leave an element's window unaltered?
2. The watermark of the system might be incorrect given that buffered data
is assumed processed and allows the watermark to pass?

It would definitely add complexity to the IO to employ timers to address
this, but if that's the preferred or only solution I'll put some thought
into how to implement that solution.

Thanks,
Evan

[1] https://github.com/apache/beam/pull/16744

On Tue, Mar 8, 2022 at 3:47 AM Jan Lukavský <je...@seznam.cz> wrote:

> Ah, sorry, the code flushes in @FinishBundle. Is it allowed to update
> output watermark while a bundle is being processed? That seems it could
> also cause the "watermark skip" problem, which is definitely an issue (and
> is probably the reason why the check fails?).
> On 3/8/22 09:35, Jan Lukavský wrote:
>
> The buffering seems incorrect to me. Whenever there is a buffer, we need
> to make sure we hold the output watermark, otherwise the watermark might
> "jump over" a buffered element transitioning it from "on-time" to "late",
> which would be a correctness bug (we can transition elements only from
> "late" to "on-time", never the other way around). The alternative is to use
> @FinishBundle to do the flushing, but might not be appropriate here.
>
> Currently, the only way to limit the progress of output watermark is by
> setting a timer with output timestamp that has the timestamp of the
> earliest element in the buffer. There was a thread that was discussing this
> in more details [1].
>
>  Jan
>
> [1] https://lists.apache.org/thread/y4n5rnhfmh47orhhr92og2r0plvgopl8
> On 3/7/22 19:54, Evan Galpin wrote:
>
> In general, I'd also really like to improve my understanding and learn
> more about how the employed buffering can cause this skew.  Is it because
> the call to "flush" is happening from a different "current window" than the
> elements were originally buffered from?  I'm actually thinking that the
> PR[1] to "fix" this would have had the side effect of outputting buffered
> elements into the window from which "flush" was called rather than the
> window from which the buffered data originated. I suppose that could be
> problematic, but should at least satisfy the validation code.
>
> [1] https://github.com/apache/beam/pull/16744
>
> On Mon, Mar 7, 2022 at 1:39 PM Evan Galpin <evan.gal...@gmail.com> wrote:
>
>> x-post from the associated Jira ticket[0]
>>
>>
>> Fortunately/unfortunately this same issue struck me as well, and I opened
>> a PR[1] to use `ProcessContext#output` rather than
>> `ProcessContext#outputWithTimestamp`.  I believe that should resolve this
>> issue, it has for me when running jobs with a vendored SDK with that change
>> included.  Do folks feel this change to be cherry-picked into 2.37.0?
>>
>> The change also prompted a question to the mailing list[2] about skew
>> validation difference between ProcessContext vs FinishBundleContext (where
>> there is no ability to compute skew as I understand it).
>>
>> [0] https://issues.apache.org/jira/browse/BEAM-14064
>>
>> [1] https://github.com/apache/beam/pull/16744
>>
>> [2] https://lists.apache.org/thread/33kj1yjmn6kkvpl4vz02vnfyn2bpzycp
>>
>> On Mon, Mar 7, 2022 at 12:41 PM Luke Cwik <lc...@google.com> wrote:
>>
>>> This is specifically a case where the @ProcessElement saw window X for
>>> element X0 and buffered it into memory and then when processing window Y
>>> and element Y0 wanted to flush previously buffered element X0. This all
>>> occurred as part of the same bundle.
>>>
>>> In general, yes, outputting to an earlier window is problematic.
>>>
>>> On Mon, Mar 7, 2022 at 9:32 AM Reuven Lax <re...@google.com> wrote:
>>>
>>>> Outputting to an earlier window is problematic,as the watermark can
>>>> never be correct if a DoFn can move time backwards arbitrarily.
>>>>
>>>> On Mon, Mar 7, 2022 at 9:01 AM Luke Cwik <lc...@google.com> wrote:
>>>>
>>>>> A good question would be should I be able to output to a different
>>>>> window from the current @ProcessElement call, like what we can do from
>>>>> @FinishBundle to handle these buffering scenarios.
>>>>>
>>>>> On Mon, Mar 7, 2022 at 8:53 AM Luke Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> The issue is that ElasticsearchIO is collecting results from elements
>>>>>> in window X and then trying to output them in window Y when flushing the
>>>>>> batch. This exposed a bug where elements that were being buffered were
>>>>>> being output as part of a different window than what the window that
>>>>>> produced them was.
>>>>>>
>>>>>> This became visible because validation was added recently to ensure
>>>>>> that when the pipeline is processing elements in window X that output 
>>>>>> with
>>>>>> a timestamp is valid for window X. Note that this validation only occurs 
>>>>>> in
>>>>>> @ProcessElement since output is associated with the current window with 
>>>>>> the
>>>>>> input element that is being processed.
>>>>>>
>>>>>> It is ok to do this in @FinishBundle since there is no existing
>>>>>> windowing context and when you output that element is assigned to an
>>>>>> appropriate window.
>>>>>>
>>>>>> Filed https://issues.apache.org/jira/browse/BEAM-14064
>>>>>>
>>>>>> On Mon, Mar 7, 2022 at 7:44 AM Emils Solmanis <
>>>>>> emils.solma...@rvu.co.uk> wrote:
>>>>>>
>>>>>>> Hi all,
>>>>>>> I think we’re hitting a regression in ElasticIO batch writing.
>>>>>>>
>>>>>>> We’ve bisected it to being introduced in 2.35.0, and I’m reasonably
>>>>>>> certain it’s this PR https://github.com/apache/beam/pull/15381
>>>>>>>
>>>>>>> Our scenario is pretty trivial, we read off Pubsub and write to
>>>>>>> Elastic in a streaming job, the config for the source and sink is
>>>>>>> respectively
>>>>>>>
>>>>>>> pipeline.apply(
>>>>>>>             PubsubIO.readStrings().fromSubscription(subscription)
>>>>>>>         ).apply(ParseJsons.of(OurObject::class.java))
>>>>>>>             .setCoder(KryoCoder.of())
>>>>>>>
>>>>>>> and
>>>>>>>
>>>>>>> ElasticsearchIO.write()
>>>>>>>             .withUseStatefulBatches(true)
>>>>>>>             .withMaxParallelRequestsPerWindow(1)
>>>>>>>             .withMaxBufferingDuration(Duration.standardSeconds(30))
>>>>>>>             // 5 bytes **> KiB **> MiB, so 5 MiB
>>>>>>>             .withMaxBatchSizeBytes(5L * 1024 * 1024)
>>>>>>>             // # of docs
>>>>>>>             .withMaxBatchSize(1000)
>>>>>>>             .withConnectionConfiguration(
>>>>>>>                 ElasticsearchIO.ConnectionConfiguration.create(
>>>>>>>                     arrayOf(host),
>>>>>>>                     "fubar",
>>>>>>>                     "_doc"
>>>>>>>                 ).withConnectTimeout(5000)
>>>>>>>                     .withSocketTimeout(30000)
>>>>>>>             )
>>>>>>>             .withRetryConfiguration(
>>>>>>>                 ElasticsearchIO.RetryConfiguration.create(
>>>>>>>                     10,
>>>>>>>                     // the duration is wall clock, against the 
>>>>>>> connection and socket timeouts specified
>>>>>>>                     // above. I.e., 10 x 30s is gonna be more than 3 
>>>>>>> minutes, so if we're getting
>>>>>>>                     // 10 socket timeouts in a row, this would ignore 
>>>>>>> the "10" part and terminate
>>>>>>>                     // after 6. The idea is that in a mixed failure 
>>>>>>> mode, you'd get different timeouts
>>>>>>>                     // of different durations, and on average 10 x 
>>>>>>> fails < 4m.
>>>>>>>                     // That said, 4m is arbitrary, so adjust as and 
>>>>>>> when needed.
>>>>>>>                     Duration.standardMinutes(4)
>>>>>>>                 )
>>>>>>>             )
>>>>>>>             .withIdFn { f: JsonNode -> f["id"].asText() }
>>>>>>>             .withIndexFn { f: JsonNode -> f["schema_name"].asText() }
>>>>>>>             .withIsDeleteFn { f: JsonNode -> 
>>>>>>> f["_action"].asText("noop") == "delete" }
>>>>>>>
>>>>>>> We recently tried upgrading 2.33 to 2.36 and immediately hit a bug
>>>>>>> in the consumer, due to alleged time skew, specifically
>>>>>>>
>>>>>>> 2022-03-07 10:48:37.886 GMTError message from worker: 
>>>>>>> java.lang.IllegalArgumentException: Cannot output with timestamp 
>>>>>>> 2022-03-07T10:43:38.640Z. Output timestamps must be no earlier than the 
>>>>>>> timestamp of the
>>>>>>> current input (2022-03-07T10:43:43.562Z) minus the allowed skew (0 
>>>>>>> milliseconds) and no later than 294247-01-10T04:00:54.775Z. See the 
>>>>>>> DoFn#getAllowedTimestampSkew() Javadoc
>>>>>>> for details on changing the allowed skew.
>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.checkTimestamp(SimpleDoFnRunner.java:446)
>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:422)
>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn$ProcessContextAdapter.output(ElasticsearchIO.java:2364)
>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.flushAndOutputResults(ElasticsearchIO.java:2404)
>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.addAndMaybeFlush(ElasticsearchIO.java:2419)
>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOStatefulFn.processElement(ElasticsearchIO.java:2300)
>>>>>>>
>>>>>>> I’ve bisected it and 2.34 works fine, 2.35 is the first version this
>>>>>>> breaks, and it seems like the code in the trace is largely added by the 
>>>>>>> PR
>>>>>>> linked above. The error usually claims a skew of a few seconds, but
>>>>>>> obviously I can’t override getAllowedTimestampSkew() on the
>>>>>>> internal Elastic DoFn, and it’s marked deprecated anyway.
>>>>>>>
>>>>>>> I’m happy to raise a JIRA but I’m not 100% sure what the code was
>>>>>>> intending to fix, and additionally, I’d also be happy if someone else 
>>>>>>> can
>>>>>>> reproduce this or knows of similar reports. I feel like what we’re 
>>>>>>> doing is
>>>>>>> not *that* uncommon a scenario, so I would have thought someone
>>>>>>> else would have hit this by now.
>>>>>>>
>>>>>>> Best,
>>>>>>> Emils
>>>>>>>
>>>>>>

Reply via email to