Ah, sorry, the code flushes in @FinishBundle. Is it allowed to update output watermark while a bundle is being processed? That seems it could also cause the "watermark skip" problem, which is definitely an issue (and is probably the reason why the check fails?).

On 3/8/22 09:35, Jan Lukavský wrote:

The buffering seems incorrect to me. Whenever there is a buffer, we need to make sure we hold the output watermark, otherwise the watermark might "jump over" a buffered element transitioning it from "on-time" to "late", which would be a correctness bug (we can transition elements only from "late" to "on-time", never the other way around). The alternative is to use @FinishBundle to do the flushing, but might not be appropriate here.

Currently, the only way to limit the progress of output watermark is by setting a timer with output timestamp that has the timestamp of the earliest element in the buffer. There was a thread that was discussing this in more details [1].

 Jan

[1] https://lists.apache.org/thread/y4n5rnhfmh47orhhr92og2r0plvgopl8

On 3/7/22 19:54, Evan Galpin wrote:
In general, I'd also really like to improve my understanding and learn more about how the employed buffering can cause this skew.  Is it because the call to "flush" is happening from a different "current window" than the elements were originally buffered from?  I'm actually thinking that the PR[1] to "fix" this would have had the side effect of outputting buffered elements into the window from which "flush" was called rather than the window from which the buffered data originated. I suppose that could be problematic, but should at least satisfy the validation code.

[1] https://github.com/apache/beam/pull/16744


On Mon, Mar 7, 2022 at 1:39 PM Evan Galpin <evan.gal...@gmail.com> wrote:

    x-post from the associated Jira ticket[0]


    Fortunately/unfortunately this same issue struck me as well, and
    I opened a PR[1] to use `ProcessContext#output` rather than
    `ProcessContext#outputWithTimestamp`.  I believe that should
    resolve this issue, it has for me when running jobs with a
    vendored SDK with that change included.  Do folks feel this
    change to be cherry-picked into 2.37.0?

    The change also prompted a question to the mailing list[2] about
    skew validation difference between ProcessContext vs
    FinishBundleContext (where there is no ability to compute skew as
    I understand it).

    [0] https://issues.apache.org/jira/browse/BEAM-14064

    [1] https://github.com/apache/beam/pull/16744

    [2] https://lists.apache.org/thread/33kj1yjmn6kkvpl4vz02vnfyn2bpzycp


    On Mon, Mar 7, 2022 at 12:41 PM Luke Cwik <lc...@google.com> wrote:

        This is specifically a case where the @ProcessElement saw
        window X for element X0 and buffered it into memory and then
        when processing window Y and element Y0 wanted to flush
        previously buffered element X0. This all occurred as part of
        the same bundle.

        In general, yes, outputting to an earlier window is problematic.

        On Mon, Mar 7, 2022 at 9:32 AM Reuven Lax <re...@google.com>
        wrote:

            Outputting to an earlier window is problematic,as the
            watermark can never be correct if a DoFn can move time
            backwards arbitrarily.

            On Mon, Mar 7, 2022 at 9:01 AM Luke Cwik
            <lc...@google.com> wrote:

                A good question would be should I be able to output
                to a different window from the
                current @ProcessElement call, like what we can do
                from @FinishBundle to handle these buffering scenarios.

                On Mon, Mar 7, 2022 at 8:53 AM Luke Cwik
                <lc...@google.com> wrote:

                    The issue is that ElasticsearchIO is collecting
                    results from elements in window X and then trying
                    to output them in window Y when flushing the
                    batch. This exposed a bug where elements that
                    were being buffered were being output as part of
                    a different window than what the window that
                    produced them was.

                    This became visible because validation was added
                    recently to ensure that when the pipeline is
                    processing elements in window X that output with
                    a timestamp is valid for window X. Note that this
                    validation only occurs in @ProcessElement since
                    output is associated with the current window with
                    the input element that is being processed.

                    It is ok to do this in @FinishBundle since there
                    is no existing windowing context and when you
                    output that element is assigned to an appropriate
                    window.

                    Filed
                    https://issues.apache.org/jira/browse/BEAM-14064

                    On Mon, Mar 7, 2022 at 7:44 AM Emils Solmanis
                    <emils.solma...@rvu.co.uk> wrote:

                        Hi all,
                        I think we’re hitting a regression in
                        ElasticIO batch writing.

                        We’ve bisected it to being introduced in
                        2.35.0, and I’m reasonably certain it’s this
                        PR https://github.com/apache/beam/pull/15381

                        Our scenario is pretty trivial, we read off
                        Pubsub and write to Elastic in a streaming
                        job, the config for the source and sink is
                        respectively

                        |pipeline.apply(
                        PubsubIO.readStrings().fromSubscription(subscription)
                        ).apply(ParseJsons.of(OurObject::class.java))
                        .setCoder(KryoCoder.of()) |

                        and

                        |ElasticsearchIO.write()
                        .withUseStatefulBatches(true)
                        .withMaxParallelRequestsPerWindow(1)
                        .withMaxBufferingDuration(Duration.standardSeconds(30))
                        // 5 bytes **> KiB **> MiB, so 5 MiB
                        .withMaxBatchSizeBytes(5L * 1024 * 1024) // #
                        of docs .withMaxBatchSize(1000)
                        .withConnectionConfiguration(
                        ElasticsearchIO.ConnectionConfiguration.create(
                        arrayOf(host), "fubar", "_doc"
                        ).withConnectTimeout(5000)
                        .withSocketTimeout(30000) )
                        .withRetryConfiguration(
                        ElasticsearchIO.RetryConfiguration.create(
                        10, // the duration is wall clock, against
                        the connection and socket timeouts specified
                        // above. I.e., 10 x 30s is gonna be more
                        than 3 minutes, so if we're getting // 10
                        socket timeouts in a row, this would ignore
                        the "10" part and terminate // after 6. The
                        idea is that in a mixed failure mode, you'd
                        get different timeouts // of different
                        durations, and on average 10 x fails < 4m. //
                        That said, 4m is arbitrary, so adjust as and
                        when needed. Duration.standardMinutes(4) ) )
                        .withIdFn { f: JsonNode -> f["id"].asText() }
                        .withIndexFn { f: JsonNode ->
                        f["schema_name"].asText() } .withIsDeleteFn {
                        f: JsonNode -> f["_action"].asText("noop") ==
                        "delete" } |

                        We recently tried upgrading 2.33 to 2.36 and
                        immediately hit a bug in the consumer, due to
                        alleged time skew, specifically

                        |2022-03-07 10:48:37.886 GMTError message
                        from worker:
                        java.lang.IllegalArgumentException: Cannot
                        output with timestamp
                        2022-03-07T10:43:38.640Z. Output timestamps
                        must be no earlier than the timestamp of the
                        current input (2022-03-07T10:43:43.562Z)
                        minus the allowed skew (0 milliseconds) and
                        no later than 294247-01-10T04:00:54.775Z. See
                        the DoFn#getAllowedTimestampSkew() Javadoc
                        for details on changing the allowed skew.
                        
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.checkTimestamp(SimpleDoFnRunner.java:446)
                        
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:422)
                        
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn$ProcessContextAdapter.output(ElasticsearchIO.java:2364)
                        
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.flushAndOutputResults(ElasticsearchIO.java:2404)
                        
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.addAndMaybeFlush(ElasticsearchIO.java:2419)
                        
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOStatefulFn.processElement(ElasticsearchIO.java:2300)
                        |

                        I’ve bisected it and 2.34 works fine, 2.35 is
                        the first version this breaks, and it seems
                        like the code in the trace is largely added
                        by the PR linked above. The error usually
                        claims a skew of a few seconds, but obviously
                        I can’t override |getAllowedTimestampSkew()|
                        on the internal Elastic DoFn, and it’s marked
                        deprecated anyway.

                        I’m happy to raise a JIRA but I’m not 100%
                        sure what the code was intending to fix, and
                        additionally, I’d also be happy if someone
                        else can reproduce this or knows of similar
                        reports. I feel like what we’re doing is not
                        *that* uncommon a scenario, so I would have
                        thought someone else would have hit this by now.

                        Best,
                        Emils

Reply via email to