[
https://issues.apache.org/jira/browse/BEAM-14064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17512106#comment-17512106
]
Daniel Oliveira commented on BEAM-14064:
----------------------------------------
Hi, I'm managing the 2.38.0 release. I see this is set to release-blocking, but
I'm not sure if it's actually worth blocking the release.
While it does fit our "significant regression or loss of functionality"
requirement [from the
website|https://beam.apache.org/contribute/release-blocking/], we've set a
precedent of blocking for new regressions and not blocking for known existing
issues. Basically, since this issue has been present for the past 3 releases,
it's not a new regression and not something we want to block a release on. The
cat's already out of the bag, so to speak. In addition to all that, the scope
of this seems limited to ElasticsearchIO. If it had broad impact it might be
worth making an exception, but not as it stands.
Evan, can we take this off the release blocker list and just get it in for
2.39.0 instead? Do you have an argument for keeping it as a release blocker?
> ElasticSearchIO#Write buffering and outputting across windows
> -------------------------------------------------------------
>
> Key: BEAM-14064
> URL: https://issues.apache.org/jira/browse/BEAM-14064
> Project: Beam
> Issue Type: Bug
> Components: io-java-elasticsearch
> Affects Versions: 2.35.0, 2.36.0, 2.37.0
> Reporter: Luke Cwik
> Assignee: Evan Galpin
> Priority: P1
> Fix For: 2.38.0
>
> Time Spent: 5.5h
> Remaining Estimate: 0h
>
> Source: https://lists.apache.org/thread/mtwtno2o88lx3zl12jlz7o5w1lcgm2db
> Bug PR: https://github.com/apache/beam/pull/15381
> ElasticsearchIO is collecting results from elements in window X and then
> trying to output them in window Y when flushing the batch. This exposed a bug
> where elements that were being buffered were being output as part of a
> different window than what the window that produced them was.
> This became visible because validation was added recently to ensure that when
> the pipeline is processing elements in window X that output with a timestamp
> is valid for window X. Note that this validation only occurs in
> *@ProcessElement* since output is associated with the current window with the
> input element that is being processed.
> It is ok to do this in *@FinishBundle* since there is no existing windowing
> context and when you output that element is assigned to an appropriate window.
> *Further Context*
> We’ve bisected it to being introduced in 2.35.0, and I’m reasonably certain
> it’s this PR https://github.com/apache/beam/pull/15381
> Our scenario is pretty trivial, we read off Pubsub and write to Elastic in a
> streaming job, the config for the source and sink is respectively
> {noformat}
> pipeline.apply(
> PubsubIO.readStrings().fromSubscription(subscription)
> ).apply(ParseJsons.of(OurObject::class.java))
> .setCoder(KryoCoder.of())
> {noformat}
> and
> {noformat}
> ElasticsearchIO.write()
> .withUseStatefulBatches(true)
> .withMaxParallelRequestsPerWindow(1)
> .withMaxBufferingDuration(Duration.standardSeconds(30))
> // 5 bytes **> KiB **> MiB, so 5 MiB
> .withMaxBatchSizeBytes(5L * 1024 * 1024)
> // # of docs
> .withMaxBatchSize(1000)
> .withConnectionConfiguration(
> ElasticsearchIO.ConnectionConfiguration.create(
> arrayOf(host),
> "fubar",
> "_doc"
> ).withConnectTimeout(5000)
> .withSocketTimeout(30000)
> )
> .withRetryConfiguration(
> ElasticsearchIO.RetryConfiguration.create(
> 10,
> // the duration is wall clock, against the connection and
> socket timeouts specified
> // above. I.e., 10 x 30s is gonna be more than 3 minutes,
> so if we're getting
> // 10 socket timeouts in a row, this would ignore the
> "10" part and terminate
> // after 6. The idea is that in a mixed failure mode,
> you'd get different timeouts
> // of different durations, and on average 10 x fails < 4m.
> // That said, 4m is arbitrary, so adjust as and when
> needed.
> Duration.standardMinutes(4)
> )
> )
> .withIdFn { f: JsonNode -> f["id"].asText() }
> .withIndexFn { f: JsonNode -> f["schema_name"].asText() }
> .withIsDeleteFn { f: JsonNode -> f["_action"].asText("noop") ==
> "delete" }
> {noformat}
> We recently tried upgrading 2.33 to 2.36 and immediately hit a bug in the
> consumer, due to alleged time skew, specifically
> {noformat}
> 2022-03-07 10:48:37.886 GMTError message from worker:
> java.lang.IllegalArgumentException: Cannot output with timestamp
> 2022-03-07T10:43:38.640Z. Output timestamps must be no earlier than the
> timestamp of the
> current input (2022-03-07T10:43:43.562Z) minus the allowed skew (0
> milliseconds) and no later than 294247-01-10T04:00:54.775Z. See the
> DoFn#getAllowedTimestampSkew() Javadoc
> for details on changing the allowed skew.
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.checkTimestamp(SimpleDoFnRunner.java:446)
>
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:422)
>
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn$ProcessContextAdapter.output(ElasticsearchIO.java:2364)
>
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.flushAndOutputResults(ElasticsearchIO.java:2404)
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.addAndMaybeFlush(ElasticsearchIO.java:2419)
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOStatefulFn.processElement(ElasticsearchIO.java:2300)
> {noformat}
> I’ve bisected it and 2.34 works fine, 2.35 is the first version this breaks,
> and it seems like the code in the trace is largely added by the PR linked
> above. The error usually claims a skew of a few seconds, but obviously I
> can’t override getAllowedTimestampSkew() on the internal Elastic DoFn, and
> it’s marked deprecated anyway.
> I’m happy to raise a JIRA but I’m not 100% sure what the code was intending
> to fix, and additionally, I’d also be happy if someone else can reproduce
> this or knows of similar reports. I feel like what we’re doing is not that
> uncommon a scenario, so I would have thought someone else would have hit this
> by now.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)