Hi Evan,

the fix looks good to me, as long as the timestamp of the buffered data need to be preserved downstream. Generally I think it *should* be possible to output in-memory buffered data in @ProcessElement (and @FinishBundle), the case where you need timers is when your buffer needs to span multiple bundles. It that case it also must be stored in regular state and not in-memory.

It seems to me that there is some logical gap in how we handle the per-bundle buffering. I see two possibilities:

 a) either we allow in the model to change the output watermark *while* processing a bundle, - in which case it is logical requirement to have output timestamp from @ProcessElement no earlier than timestamp of the current element (because that way we preserve the "on time", "late" status of the current element, we don't swap anything), or

 b) we enforce output watermark update only in-between of bundles - in that case the requirement could be relaxed that the output timestamp from @ProcessElement might be no earlier than the minimum of timestamps inside the bundle

I'm afraid that our current position is a). But in that case it is somewhat questionable if it is semantically correct to use outputWithTimestamp() in @ProcessElement of stateless DoFn at all. It can move timestamps only to future instant (and inside same window!), which has little semantic meaning to me. Looks more error prone than useful.

 Jan

On 3/8/22 15:53, Evan Galpin wrote:
Thanks Jan, it's interesting to read about the handling of timestamp in cases employing a buffering pattern.  In the case of the ES write transform, buffered data could be output from ProcessElement or FinishBundle.  It's the case where data is output from ProcessElement that the error reported at the start of this thread shows up.  Based on what you described, it sounds like the PR[1] I made to "fix" this issue is actually fixing it by transitioning data from being late to being on time and outputting all buffered data into a non-deterministic window where the output heuristic is later satisfied (number of elements buffered or max time since last output).  It seems there's 2 issues as a result:

1. The window to which input elements belong is not being preserved. Presumably we want IOs to leave an element's window unaltered? 2. The watermark of the system might be incorrect given that buffered data is assumed processed and allows the watermark to pass?

It would definitely add complexity to the IO to employ timers to address this, but if that's the preferred or only solution I'll put some thought into how to implement that solution.

Thanks,
Evan

[1] https://github.com/apache/beam/pull/16744

On Tue, Mar 8, 2022 at 3:47 AM Jan Lukavský <je...@seznam.cz> wrote:

    Ah, sorry, the code flushes in @FinishBundle. Is it allowed to
    update output watermark while a bundle is being processed? That
    seems it could also cause the "watermark skip" problem, which is
    definitely an issue (and is probably the reason why the check fails?).

    On 3/8/22 09:35, Jan Lukavský wrote:

    The buffering seems incorrect to me. Whenever there is a buffer,
    we need to make sure we hold the output watermark, otherwise the
    watermark might "jump over" a buffered element transitioning it
    from "on-time" to "late", which would be a correctness bug (we
    can transition elements only from "late" to "on-time", never the
    other way around). The alternative is to use @FinishBundle to do
    the flushing, but might not be appropriate here.

    Currently, the only way to limit the progress of output watermark
    is by setting a timer with output timestamp that has the
    timestamp of the earliest element in the buffer. There was a
    thread that was discussing this in more details [1].

     Jan

    [1] https://lists.apache.org/thread/y4n5rnhfmh47orhhr92og2r0plvgopl8

    On 3/7/22 19:54, Evan Galpin wrote:
    In general, I'd also really like to improve my understanding and
    learn more about how the employed buffering can cause this
    skew.  Is it because the call to "flush" is happening from a
    different "current window" than the elements were originally
    buffered from?  I'm actually thinking that the PR[1] to "fix"
    this would have had the side effect of outputting buffered
    elements into the window from which "flush" was called rather
    than the window from which the buffered data originated. I
    suppose that could be problematic, but should at least satisfy
    the validation code.

    [1] https://github.com/apache/beam/pull/16744


    On Mon, Mar 7, 2022 at 1:39 PM Evan Galpin
    <evan.gal...@gmail.com> wrote:

        x-post from the associated Jira ticket[0]


        Fortunately/unfortunately this same issue struck me as well,
        and I opened a PR[1] to use `ProcessContext#output` rather
        than `ProcessContext#outputWithTimestamp`.  I believe that
        should resolve this issue, it has for me when running jobs
        with a vendored SDK with that change included.  Do folks
        feel this change to be cherry-picked into 2.37.0?

        The change also prompted a question to the mailing list[2]
        about skew validation difference between ProcessContext vs
        FinishBundleContext (where there is no ability to compute
        skew as I understand it).

        [0] https://issues.apache.org/jira/browse/BEAM-14064

        [1] https://github.com/apache/beam/pull/16744

        [2]
        https://lists.apache.org/thread/33kj1yjmn6kkvpl4vz02vnfyn2bpzycp


        On Mon, Mar 7, 2022 at 12:41 PM Luke Cwik <lc...@google.com>
        wrote:

            This is specifically a case where the @ProcessElement
            saw window X for element X0 and buffered it into memory
            and then when processing window Y and element Y0
            wanted to flush previously buffered element X0. This all
            occurred as part of the same bundle.

            In general, yes, outputting to an earlier window is
            problematic.

            On Mon, Mar 7, 2022 at 9:32 AM Reuven Lax
            <re...@google.com> wrote:

                Outputting to an earlier window is problematic,as
                the watermark can never be correct if a DoFn can
                move time backwards arbitrarily.

                On Mon, Mar 7, 2022 at 9:01 AM Luke Cwik
                <lc...@google.com> wrote:

                    A good question would be should I be able to
                    output to a different window from the
                    current @ProcessElement call, like what we can
                    do from @FinishBundle to handle these buffering
                    scenarios.

                    On Mon, Mar 7, 2022 at 8:53 AM Luke Cwik
                    <lc...@google.com> wrote:

                        The issue is that ElasticsearchIO is
                        collecting results from elements in window X
                        and then trying to output them in window Y
                        when flushing the batch. This exposed a bug
                        where elements that were being buffered were
                        being output as part of a different window
                        than what the window that produced them was.

                        This became visible because validation was
                        added recently to ensure that when the
                        pipeline is processing elements in window X
                        that output with a timestamp is valid for
                        window X. Note that this validation only
                        occurs in @ProcessElement since output is
                        associated with the current window with the
                        input element that is being processed.

                        It is ok to do this in @FinishBundle since
                        there is no existing windowing context and
                        when you output that element is assigned to
                        an appropriate window.

                        Filed
                        https://issues.apache.org/jira/browse/BEAM-14064

                        On Mon, Mar 7, 2022 at 7:44 AM Emils
                        Solmanis <emils.solma...@rvu.co.uk> wrote:

                            Hi all,
                            I think we’re hitting a regression in
                            ElasticIO batch writing.

                            We’ve bisected it to being introduced in
                            2.35.0, and I’m reasonably certain it’s
                            this PR
                            https://github.com/apache/beam/pull/15381

                            Our scenario is pretty trivial, we read
                            off Pubsub and write to Elastic in a
                            streaming job, the config for the source
                            and sink is respectively

                            |pipeline.apply(
                            
PubsubIO.readStrings().fromSubscription(subscription)
                            ).apply(ParseJsons.of(OurObject::class.java))
                            .setCoder(KryoCoder.of()) |

                            and

                            |ElasticsearchIO.write()
                            .withUseStatefulBatches(true)
                            .withMaxParallelRequestsPerWindow(1)
                            
.withMaxBufferingDuration(Duration.standardSeconds(30))
                            // 5 bytes **> KiB **> MiB, so 5 MiB
                            .withMaxBatchSizeBytes(5L * 1024 * 1024)
                            // # of docs .withMaxBatchSize(1000)
                            .withConnectionConfiguration(
                            ElasticsearchIO.ConnectionConfiguration.create(
                            arrayOf(host), "fubar", "_doc"
                            ).withConnectTimeout(5000)
                            .withSocketTimeout(30000) )
                            .withRetryConfiguration(
                            ElasticsearchIO.RetryConfiguration.create(
                            10, // the duration is wall clock,
                            against the connection and socket
                            timeouts specified // above. I.e., 10 x
                            30s is gonna be more than 3 minutes, so
                            if we're getting // 10 socket timeouts
                            in a row, this would ignore the "10"
                            part and terminate // after 6. The idea
                            is that in a mixed failure mode, you'd
                            get different timeouts // of different
                            durations, and on average 10 x fails <
                            4m. // That said, 4m is arbitrary, so
                            adjust as and when needed.
                            Duration.standardMinutes(4) ) )
                            .withIdFn { f: JsonNode ->
                            f["id"].asText() } .withIndexFn { f:
                            JsonNode -> f["schema_name"].asText() }
                            .withIsDeleteFn { f: JsonNode ->
                            f["_action"].asText("noop") == "delete" } |

                            We recently tried upgrading 2.33 to 2.36
                            and immediately hit a bug in the
                            consumer, due to alleged time skew,
                            specifically

                            |2022-03-07 10:48:37.886 GMTError
                            message from worker:
                            java.lang.IllegalArgumentException:
                            Cannot output with timestamp
                            2022-03-07T10:43:38.640Z. Output
                            timestamps must be no earlier than the
                            timestamp of the current input
                            (2022-03-07T10:43:43.562Z) minus the
                            allowed skew (0 milliseconds) and no
                            later than 294247-01-10T04:00:54.775Z.
                            See the DoFn#getAllowedTimestampSkew()
                            Javadoc for details on changing the
                            allowed skew.
                            
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.checkTimestamp(SimpleDoFnRunner.java:446)
                            
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:422)
                            
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn$ProcessContextAdapter.output(ElasticsearchIO.java:2364)
                            
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.flushAndOutputResults(ElasticsearchIO.java:2404)
                            
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.addAndMaybeFlush(ElasticsearchIO.java:2419)
                            
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOStatefulFn.processElement(ElasticsearchIO.java:2300)
                            |

                            I’ve bisected it and 2.34 works fine,
                            2.35 is the first version this breaks,
                            and it seems like the code in the trace
                            is largely added by the PR linked above.
                            The error usually claims a skew of a few
                            seconds, but obviously I can’t override
                            |getAllowedTimestampSkew()| on the
                            internal Elastic DoFn, and it’s marked
                            deprecated anyway.

                            I’m happy to raise a JIRA but I’m not
                            100% sure what the code was intending to
                            fix, and additionally, I’d also be happy
                            if someone else can reproduce this or
                            knows of similar reports. I feel like
                            what we’re doing is not *that* uncommon
                            a scenario, so I would have thought
                            someone else would have hit this by now.

                            Best,
                            Emils

Reply via email to