>
> But I'm not sure if an IO throwing away windows (in favour of Global)
> would be acceptable either.  BigQueryIO#write (streaming) also has a notion
> of "getFailedWrites" with the same intent as what's in ElasticsearchIO;  I
> wonder how that's implemented?
>

It appears that BigQueryIO[1] just re-windows inputs into the global window
and does not concern itself with preserving input windows of elements nor
holding the watermark.  Is that a good precedent to follow?  It would
certainly simplify the implementation of ElasticsearchIO#bulkIO.

[1]
https://github.com/apache/beam/blob/a126adbc6aa73f1e30adfa65a3710f7f69a7ba89/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java#L313-L318

On Wed, Mar 9, 2022 at 10:13 AM Evan Galpin <evan.gal...@gmail.com> wrote:

> Oops, unfortunate misfire before being done writing my prior email.
>
> Thanks Jan for the thoughtful responses!  It definitely sounds as though
> the ElasticsearchIO#write transform would be safer to either hold the
> watermark via similar machinery employed by GroupIntoBatches or possibly
> output everything into a global window; either sounds more safe in terms of
> data correctness than potentially outputting "on time" data as "late"
> data.  But I'm not sure if an IO throwing away windows (in favour of
> Global) would be acceptable either.  BigQueryIO#write (streaming) also has
> a notion of "getFailedWrites" with the same intent as what's in
> ElasticsearchIO;  I wonder how that's implemented?
>
> I'm keen to follow any further discussion on watermark holds for DoFn's
> that implement both @StartBundle and @FinishBundle!
>
> - Evan
>
> On Wed, Mar 9, 2022 at 10:05 AM Evan Galpin <evan.gal...@gmail.com> wrote:
>
>> Got it.  So ProcessContext#ouput has the semantics of "using the window
>> associated with the current @Element, output the data being passed in into
>> that window."  Makes complete sense.
>>
>> On Wed, Mar 9, 2022 at 4:15 AM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> Interesting discussion. :-)
>>>
>>> Answers inline.
>>> On 3/8/22 22:00, Evan Galpin wrote:
>>>
>>> Thanks Jan for confirming that the fix looks alright.  I also found a
>>> PR[1] that appears to be a good case study of the Timer watermark hold
>>> technique that you previously mentioned so I'll study that a bit for my own
>>> understanding and future use.  I was also previously missing the notion
>>> that a singular bundle could contain elements from many disparate Windows,
>>> so that's a great upgrade to my own mental model haha.
>>>
>>> I have a few follow-up questions for my own understanding (hopefully not
>>> off topic).
>>>
>>>    1. Am I understanding correctly that using ProcessContext#output to
>>>    output a given element will preserve the element's input window(s)? Or is
>>>    it the case that when using ProcessContext#output, any buffered elements
>>>    will all be output to whatever window the most recent element processed
>>>    by @ProcessElement belongs to?
>>>
>>> There is no way Beam can distinguish *how* you produced elements emitted
>>> from @ProcessElement (if those are result of some in-memory buffering or
>>> direct result of computation of the single input element currently being
>>> processed). From that it follows, that window(s) associated with the output
>>> will be equal to window(s) of the input element currently processed
>>> (generally, the model allows multiple applications of window function,
>>> which is why window.maxTimestamp is 1 ms shifted backwards, to be still
>>> part of the window itself, so that the window functions based on timestamps
>>> are idempotent).
>>>
>>>
>>>    1. Is it safe to emit outputs from @FinishBundle to windows which
>>>    may be older than the watermark?  I believe this could still be happening
>>>    given the current ElasticsearchIO implementation where any buffered
>>>    elements are output in @FinishBundle using the same window they were
>>>    associated with on input.  Intuitively, that sounds as though it could 
>>> be a
>>>    correctness bug if the watermark had progressed beyond those windows.
>>>
>>> This is great question that probably deserves deeper attention.
>>>
>>>  a) first of all - the fact, that you potentially could output late
>>> data, that previously were not late is _potentially_ a correctness bug, if
>>> you have any downstream logic, that performs any additional grouping and -
>>> most notably - relies on causality being preserved (which is how our
>>> universe works, so this is a natural requirement). Changing element from
>>> "on time" to "late" can result in downstream processing see effect prior to
>>> cause (in event time!, because the watermark move can cause downstream
>>> timers to fire). I wrote something about this in [1]. Generally, if there
>>> is no grouping (or your application logic accounts for the swapping of
>>> cause and effect), then this is not an issue. If you produce a PCollection
>>> for user, then you don't know the user does with it and therefore should
>>> pay attention to it.
>>>
>>>  b) second, this effect would not appear if mid-bundle output watermarks
>>> updates were not possible. I emphasize that I don't know if this is allowed
>>> in the model, but I *suppose* it is (@kenn, @reuven or @luke can you
>>> correct me if I'm wrong?). I have some doubts if it is correct, though. It
>>> seems that it causes generally issues with in-memory buffering and
>>> 'outputWithTimestamp' in stateless DoFns. If a DoFn implements @StartBundle
>>> and @FinishBundle it seems, that we should hold output watermark between
>>> these two calls. But that would turn stateless DoFn into stateful, which is
>>> ... unfortunate. :)
>>>
>>> I hope I made things a little more clear rather than more obfuscated. :)
>>>
>>>  Jan
>>>
>>> [1] https://twitter.com/janl_apache/status/1478757956263071745
>>>
>>>
>>> It's definitely interesting to think about the idea of enforcing
>>> watermark update only between bundles, but presumably that would mean quite
>>> extensive alterations.
>>>
>>> - Evan
>>>
>>> [1] https://github.com/apache/beam/pull/15249
>>>
>>> On Tue, Mar 8, 2022 at 10:29 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>>> Hi Evan,
>>>>
>>>> the fix looks good to me, as long as the timestamp of the buffered data
>>>> need to be preserved downstream. Generally I think it *should* be possible
>>>> to output in-memory buffered data in @ProcessElement (and @FinishBundle),
>>>> the case where you need timers is when your buffer needs to span multiple
>>>> bundles. It that case it also must be stored in regular state and not
>>>> in-memory.
>>>>
>>>> It seems to me that there is some logical gap in how we handle the
>>>> per-bundle buffering. I see two possibilities:
>>>>
>>>>  a) either we allow in the model to change the output watermark *while*
>>>> processing a bundle, - in which case it is logical requirement to have
>>>> output timestamp from @ProcessElement no earlier than timestamp of the
>>>> current element (because that way we preserve the "on time", "late" status
>>>> of the current element, we don't swap anything), or
>>>>
>>>>  b) we enforce output watermark update only in-between of bundles - in
>>>> that case the requirement could be relaxed that the output timestamp from
>>>> @ProcessElement might be no earlier than the minimum of timestamps inside
>>>> the bundle
>>>>
>>>> I'm afraid that our current position is a). But in that case it is
>>>> somewhat questionable if it is semantically correct to use
>>>> outputWithTimestamp() in @ProcessElement of stateless DoFn at all. It can
>>>> move timestamps only to future instant (and inside same window!), which has
>>>> little semantic meaning to me. Looks more error prone than useful.
>>>>
>>>>  Jan
>>>> On 3/8/22 15:53, Evan Galpin wrote:
>>>>
>>>> Thanks Jan, it's interesting to read about the handling of timestamp in
>>>> cases employing a buffering pattern.  In the case of the ES write
>>>> transform, buffered data could be output from ProcessElement or
>>>> FinishBundle.  It's the case where data is output from ProcessElement that
>>>> the error reported at the start of this thread shows up.  Based on what you
>>>> described, it sounds like the PR[1] I made to "fix" this issue is actually
>>>> fixing it by transitioning data from being late to being on time and
>>>> outputting all buffered data into a non-deterministic window where the
>>>> output heuristic is later satisfied (number of elements buffered or max
>>>> time since last output).  It seems there's 2 issues as a result:
>>>>
>>>> 1. The window to which input elements belong is not being preserved.
>>>> Presumably we want IOs to leave an element's window unaltered?
>>>> 2. The watermark of the system might be incorrect given that buffered
>>>> data is assumed processed and allows the watermark to pass?
>>>>
>>>> It would definitely add complexity to the IO to employ timers to
>>>> address this, but if that's the preferred or only solution I'll put some
>>>> thought into how to implement that solution.
>>>>
>>>> Thanks,
>>>> Evan
>>>>
>>>> [1] https://github.com/apache/beam/pull/16744
>>>>
>>>> On Tue, Mar 8, 2022 at 3:47 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>>
>>>>> Ah, sorry, the code flushes in @FinishBundle. Is it allowed to update
>>>>> output watermark while a bundle is being processed? That seems it could
>>>>> also cause the "watermark skip" problem, which is definitely an issue (and
>>>>> is probably the reason why the check fails?).
>>>>> On 3/8/22 09:35, Jan Lukavský wrote:
>>>>>
>>>>> The buffering seems incorrect to me. Whenever there is a buffer, we
>>>>> need to make sure we hold the output watermark, otherwise the watermark
>>>>> might "jump over" a buffered element transitioning it from "on-time" to
>>>>> "late", which would be a correctness bug (we can transition elements only
>>>>> from "late" to "on-time", never the other way around). The alternative is
>>>>> to use @FinishBundle to do the flushing, but might not be appropriate 
>>>>> here.
>>>>>
>>>>> Currently, the only way to limit the progress of output watermark is
>>>>> by setting a timer with output timestamp that has the timestamp of the
>>>>> earliest element in the buffer. There was a thread that was discussing 
>>>>> this
>>>>> in more details [1].
>>>>>
>>>>>  Jan
>>>>>
>>>>> [1] https://lists.apache.org/thread/y4n5rnhfmh47orhhr92og2r0plvgopl8
>>>>> On 3/7/22 19:54, Evan Galpin wrote:
>>>>>
>>>>> In general, I'd also really like to improve my understanding and learn
>>>>> more about how the employed buffering can cause this skew.  Is it because
>>>>> the call to "flush" is happening from a different "current window" than 
>>>>> the
>>>>> elements were originally buffered from?  I'm actually thinking that the
>>>>> PR[1] to "fix" this would have had the side effect of outputting buffered
>>>>> elements into the window from which "flush" was called rather than the
>>>>> window from which the buffered data originated. I suppose that could be
>>>>> problematic, but should at least satisfy the validation code.
>>>>>
>>>>> [1] https://github.com/apache/beam/pull/16744
>>>>>
>>>>> On Mon, Mar 7, 2022 at 1:39 PM Evan Galpin <evan.gal...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> x-post from the associated Jira ticket[0]
>>>>>>
>>>>>>
>>>>>> Fortunately/unfortunately this same issue struck me as well, and I
>>>>>> opened a PR[1] to use `ProcessContext#output` rather than
>>>>>> `ProcessContext#outputWithTimestamp`.  I believe that should resolve this
>>>>>> issue, it has for me when running jobs with a vendored SDK with that 
>>>>>> change
>>>>>> included.  Do folks feel this change to be cherry-picked into 2.37.0?
>>>>>>
>>>>>> The change also prompted a question to the mailing list[2] about
>>>>>> skew validation difference between ProcessContext vs FinishBundleContext
>>>>>> (where there is no ability to compute skew as I understand it).
>>>>>>
>>>>>> [0] https://issues.apache.org/jira/browse/BEAM-14064
>>>>>>
>>>>>> [1] https://github.com/apache/beam/pull/16744
>>>>>>
>>>>>> [2] https://lists.apache.org/thread/33kj1yjmn6kkvpl4vz02vnfyn2bpzycp
>>>>>>
>>>>>> On Mon, Mar 7, 2022 at 12:41 PM Luke Cwik <lc...@google.com> wrote:
>>>>>>
>>>>>>> This is specifically a case where the @ProcessElement saw window X
>>>>>>> for element X0 and buffered it into memory and then when processing 
>>>>>>> window
>>>>>>> Y and element Y0 wanted to flush previously buffered element X0. This 
>>>>>>> all
>>>>>>> occurred as part of the same bundle.
>>>>>>>
>>>>>>> In general, yes, outputting to an earlier window is problematic.
>>>>>>>
>>>>>>> On Mon, Mar 7, 2022 at 9:32 AM Reuven Lax <re...@google.com> wrote:
>>>>>>>
>>>>>>>> Outputting to an earlier window is problematic,as the watermark can
>>>>>>>> never be correct if a DoFn can move time backwards arbitrarily.
>>>>>>>>
>>>>>>>> On Mon, Mar 7, 2022 at 9:01 AM Luke Cwik <lc...@google.com> wrote:
>>>>>>>>
>>>>>>>>> A good question would be should I be able to output to a different
>>>>>>>>> window from the current @ProcessElement call, like what we can do from
>>>>>>>>> @FinishBundle to handle these buffering scenarios.
>>>>>>>>>
>>>>>>>>> On Mon, Mar 7, 2022 at 8:53 AM Luke Cwik <lc...@google.com> wrote:
>>>>>>>>>
>>>>>>>>>> The issue is that ElasticsearchIO is collecting results from
>>>>>>>>>> elements in window X and then trying to output them in window Y when
>>>>>>>>>> flushing the batch. This exposed a bug where elements that were being
>>>>>>>>>> buffered were being output as part of a different window than what 
>>>>>>>>>> the
>>>>>>>>>> window that produced them was.
>>>>>>>>>>
>>>>>>>>>> This became visible because validation was added recently to
>>>>>>>>>> ensure that when the pipeline is processing elements in window X that
>>>>>>>>>> output with a timestamp is valid for window X. Note that this 
>>>>>>>>>> validation
>>>>>>>>>> only occurs in @ProcessElement since output is associated with the 
>>>>>>>>>> current
>>>>>>>>>> window with the input element that is being processed.
>>>>>>>>>>
>>>>>>>>>> It is ok to do this in @FinishBundle since there is no existing
>>>>>>>>>> windowing context and when you output that element is assigned to an
>>>>>>>>>> appropriate window.
>>>>>>>>>>
>>>>>>>>>> Filed https://issues.apache.org/jira/browse/BEAM-14064
>>>>>>>>>>
>>>>>>>>>> On Mon, Mar 7, 2022 at 7:44 AM Emils Solmanis <
>>>>>>>>>> emils.solma...@rvu.co.uk> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi all,
>>>>>>>>>>> I think we’re hitting a regression in ElasticIO batch writing.
>>>>>>>>>>>
>>>>>>>>>>> We’ve bisected it to being introduced in 2.35.0, and I’m
>>>>>>>>>>> reasonably certain it’s this PR
>>>>>>>>>>> https://github.com/apache/beam/pull/15381
>>>>>>>>>>>
>>>>>>>>>>> Our scenario is pretty trivial, we read off Pubsub and write to
>>>>>>>>>>> Elastic in a streaming job, the config for the source and sink is
>>>>>>>>>>> respectively
>>>>>>>>>>>
>>>>>>>>>>> pipeline.apply(
>>>>>>>>>>>             PubsubIO.readStrings().fromSubscription(subscription)
>>>>>>>>>>>         ).apply(ParseJsons.of(OurObject::class.java))
>>>>>>>>>>>             .setCoder(KryoCoder.of())
>>>>>>>>>>>
>>>>>>>>>>> and
>>>>>>>>>>>
>>>>>>>>>>> ElasticsearchIO.write()
>>>>>>>>>>>             .withUseStatefulBatches(true)
>>>>>>>>>>>             .withMaxParallelRequestsPerWindow(1)
>>>>>>>>>>>             .withMaxBufferingDuration(Duration.standardSeconds(30))
>>>>>>>>>>>             // 5 bytes **> KiB **> MiB, so 5 MiB
>>>>>>>>>>>             .withMaxBatchSizeBytes(5L * 1024 * 1024)
>>>>>>>>>>>             // # of docs
>>>>>>>>>>>             .withMaxBatchSize(1000)
>>>>>>>>>>>             .withConnectionConfiguration(
>>>>>>>>>>>                 ElasticsearchIO.ConnectionConfiguration.create(
>>>>>>>>>>>                     arrayOf(host),
>>>>>>>>>>>                     "fubar",
>>>>>>>>>>>                     "_doc"
>>>>>>>>>>>                 ).withConnectTimeout(5000)
>>>>>>>>>>>                     .withSocketTimeout(30000)
>>>>>>>>>>>             )
>>>>>>>>>>>             .withRetryConfiguration(
>>>>>>>>>>>                 ElasticsearchIO.RetryConfiguration.create(
>>>>>>>>>>>                     10,
>>>>>>>>>>>                     // the duration is wall clock, against the 
>>>>>>>>>>> connection and socket timeouts specified
>>>>>>>>>>>                     // above. I.e., 10 x 30s is gonna be more than 
>>>>>>>>>>> 3 minutes, so if we're getting
>>>>>>>>>>>                     // 10 socket timeouts in a row, this would 
>>>>>>>>>>> ignore the "10" part and terminate
>>>>>>>>>>>                     // after 6. The idea is that in a mixed failure 
>>>>>>>>>>> mode, you'd get different timeouts
>>>>>>>>>>>                     // of different durations, and on average 10 x 
>>>>>>>>>>> fails < 4m.
>>>>>>>>>>>                     // That said, 4m is arbitrary, so adjust as and 
>>>>>>>>>>> when needed.
>>>>>>>>>>>                     Duration.standardMinutes(4)
>>>>>>>>>>>                 )
>>>>>>>>>>>             )
>>>>>>>>>>>             .withIdFn { f: JsonNode -> f["id"].asText() }
>>>>>>>>>>>             .withIndexFn { f: JsonNode -> f["schema_name"].asText() 
>>>>>>>>>>> }
>>>>>>>>>>>             .withIsDeleteFn { f: JsonNode -> 
>>>>>>>>>>> f["_action"].asText("noop") == "delete" }
>>>>>>>>>>>
>>>>>>>>>>> We recently tried upgrading 2.33 to 2.36 and immediately hit a
>>>>>>>>>>> bug in the consumer, due to alleged time skew, specifically
>>>>>>>>>>>
>>>>>>>>>>> 2022-03-07 10:48:37.886 GMTError message from worker: 
>>>>>>>>>>> java.lang.IllegalArgumentException: Cannot output with timestamp 
>>>>>>>>>>> 2022-03-07T10:43:38.640Z. Output timestamps must be no earlier than 
>>>>>>>>>>> the timestamp of the
>>>>>>>>>>> current input (2022-03-07T10:43:43.562Z) minus the allowed skew (0 
>>>>>>>>>>> milliseconds) and no later than 294247-01-10T04:00:54.775Z. See the 
>>>>>>>>>>> DoFn#getAllowedTimestampSkew() Javadoc
>>>>>>>>>>> for details on changing the allowed skew.
>>>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.checkTimestamp(SimpleDoFnRunner.java:446)
>>>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:422)
>>>>>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn$ProcessContextAdapter.output(ElasticsearchIO.java:2364)
>>>>>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.flushAndOutputResults(ElasticsearchIO.java:2404)
>>>>>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.addAndMaybeFlush(ElasticsearchIO.java:2419)
>>>>>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOStatefulFn.processElement(ElasticsearchIO.java:2300)
>>>>>>>>>>>
>>>>>>>>>>> I’ve bisected it and 2.34 works fine, 2.35 is the first version
>>>>>>>>>>> this breaks, and it seems like the code in the trace is largely 
>>>>>>>>>>> added by
>>>>>>>>>>> the PR linked above. The error usually claims a skew of a few 
>>>>>>>>>>> seconds, but
>>>>>>>>>>> obviously I can’t override getAllowedTimestampSkew() on the
>>>>>>>>>>> internal Elastic DoFn, and it’s marked deprecated anyway.
>>>>>>>>>>>
>>>>>>>>>>> I’m happy to raise a JIRA but I’m not 100% sure what the code
>>>>>>>>>>> was intending to fix, and additionally, I’d also be happy if 
>>>>>>>>>>> someone else
>>>>>>>>>>> can reproduce this or knows of similar reports. I feel like what 
>>>>>>>>>>> we’re
>>>>>>>>>>> doing is not *that* uncommon a scenario, so I would have
>>>>>>>>>>> thought someone else would have hit this by now.
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Emils
>>>>>>>>>>>
>>>>>>>>>>

Reply via email to