Luke Cwik created BEAM-14064:
--------------------------------
Summary: ElasticSearchIO#Write buffering and outputting across
windows
Key: BEAM-14064
URL: https://issues.apache.org/jira/browse/BEAM-14064
Project: Beam
Issue Type: Bug
Components: io-java-elasticsearch
Affects Versions: 2.36.0, 2.35.0
Reporter: Luke Cwik
Assignee: Evan Galpin
Source: https://lists.apache.org/thread/mtwtno2o88lx3zl12jlz7o5w1lcgm2db
Bug PR: https://github.com/apache/beam/pull/15381
ElasticsearchIO is collecting results from elements in window X and then trying
to output them in window Y when flushing the batch. This exposed a bug where
elements that were being buffered were being output as part of a different
window than what the window that produced them was.
This became visible because validation was added recently to ensure that when
the pipeline is processing elements in window X that output with a timestamp is
valid for window X. Note that this validation only occurs in @ProcessElement
since output is associated with the current window with the input element that
is being processed.
Further Context
We’ve bisected it to being introduced in 2.35.0, and I’m reasonably certain
it’s this PR https://github.com/apache/beam/pull/15381
Our scenario is pretty trivial, we read off Pubsub and write to Elastic in a
streaming job, the config for the source and sink is respectively
{noformat}
pipeline.apply(
PubsubIO.readStrings().fromSubscription(subscription)
).apply(ParseJsons.of(OurObject::class.java))
.setCoder(KryoCoder.of())
{noformat}
and
{noformat}
ElasticsearchIO.write()
.withUseStatefulBatches(true)
.withMaxParallelRequestsPerWindow(1)
.withMaxBufferingDuration(Duration.standardSeconds(30))
// 5 bytes **> KiB **> MiB, so 5 MiB
.withMaxBatchSizeBytes(5L * 1024 * 1024)
// # of docs
.withMaxBatchSize(1000)
.withConnectionConfiguration(
ElasticsearchIO.ConnectionConfiguration.create(
arrayOf(host),
"fubar",
"_doc"
).withConnectTimeout(5000)
.withSocketTimeout(30000)
)
.withRetryConfiguration(
ElasticsearchIO.RetryConfiguration.create(
10,
// the duration is wall clock, against the connection and
socket timeouts specified
// above. I.e., 10 x 30s is gonna be more than 3 minutes,
so if we're getting
// 10 socket timeouts in a row, this would ignore the "10"
part and terminate
// after 6. The idea is that in a mixed failure mode, you'd
get different timeouts
// of different durations, and on average 10 x fails < 4m.
// That said, 4m is arbitrary, so adjust as and when needed.
Duration.standardMinutes(4)
)
)
.withIdFn { f: JsonNode -> f["id"].asText() }
.withIndexFn { f: JsonNode -> f["schema_name"].asText() }
.withIsDeleteFn { f: JsonNode -> f["_action"].asText("noop") ==
"delete" }
{noformat}
We recently tried upgrading 2.33 to 2.36 and immediately hit a bug in the
consumer, due to alleged time skew, specifically
{noformat}
2022-03-07 10:48:37.886 GMTError message from worker:
java.lang.IllegalArgumentException: Cannot output with timestamp
2022-03-07T10:43:38.640Z. Output timestamps must be no earlier than the
timestamp of the
current input (2022-03-07T10:43:43.562Z) minus the allowed skew (0
milliseconds) and no later than 294247-01-10T04:00:54.775Z. See the
DoFn#getAllowedTimestampSkew() Javadoc
for details on changing the allowed skew.
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.checkTimestamp(SimpleDoFnRunner.java:446)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:422)
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn$ProcessContextAdapter.output(ElasticsearchIO.java:2364)
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.flushAndOutputResults(ElasticsearchIO.java:2404)
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.addAndMaybeFlush(ElasticsearchIO.java:2419)
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOStatefulFn.processElement(ElasticsearchIO.java:2300)
{noformat}
I’ve bisected it and 2.34 works fine, 2.35 is the first version this breaks,
and it seems like the code in the trace is largely added by the PR linked
above. The error usually claims a skew of a few seconds, but obviously I can’t
override getAllowedTimestampSkew() on the internal Elastic DoFn, and it’s
marked deprecated anyway.
I’m happy to raise a JIRA but I’m not 100% sure what the code was intending to
fix, and additionally, I’d also be happy if someone else can reproduce this or
knows of similar reports. I feel like what we’re doing is not that uncommon a
scenario, so I would have thought someone else would have hit this by now.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)