Hi all,
I think we’re hitting a regression in ElasticIO batch writing.

We’ve bisected it to being introduced in 2.35.0, and I’m reasonably certain
it’s this PR https://github.com/apache/beam/pull/15381

Our scenario is pretty trivial, we read off Pubsub and write to Elastic in
a streaming job, the config for the source and sink is respectively

pipeline.apply(
            PubsubIO.readStrings().fromSubscription(subscription)
        ).apply(ParseJsons.of(OurObject::class.java))
            .setCoder(KryoCoder.of())

and

ElasticsearchIO.write()
            .withUseStatefulBatches(true)
            .withMaxParallelRequestsPerWindow(1)
            .withMaxBufferingDuration(Duration.standardSeconds(30))
            // 5 bytes **> KiB **> MiB, so 5 MiB
            .withMaxBatchSizeBytes(5L * 1024 * 1024)
            // # of docs
            .withMaxBatchSize(1000)
            .withConnectionConfiguration(
                ElasticsearchIO.ConnectionConfiguration.create(
                    arrayOf(host),
                    "fubar",
                    "_doc"
                ).withConnectTimeout(5000)
                    .withSocketTimeout(30000)
            )
            .withRetryConfiguration(
                ElasticsearchIO.RetryConfiguration.create(
                    10,
                    // the duration is wall clock, against the
connection and socket timeouts specified
                    // above. I.e., 10 x 30s is gonna be more than 3
minutes, so if we're getting
                    // 10 socket timeouts in a row, this would ignore
the "10" part and terminate
                    // after 6. The idea is that in a mixed failure
mode, you'd get different timeouts
                    // of different durations, and on average 10 x fails < 4m.
                    // That said, 4m is arbitrary, so adjust as and when needed.
                    Duration.standardMinutes(4)
                )
            )
            .withIdFn { f: JsonNode -> f["id"].asText() }
            .withIndexFn { f: JsonNode -> f["schema_name"].asText() }
            .withIsDeleteFn { f: JsonNode ->
f["_action"].asText("noop") == "delete" }

We recently tried upgrading 2.33 to 2.36 and immediately hit a bug in the
consumer, due to alleged time skew, specifically

2022-03-07 10:48:37.886 GMTError message from worker:
java.lang.IllegalArgumentException: Cannot output with timestamp
2022-03-07T10:43:38.640Z. Output timestamps must be no earlier than
the timestamp of the
current input (2022-03-07T10:43:43.562Z) minus the allowed skew (0
milliseconds) and no later than 294247-01-10T04:00:54.775Z. See the
DoFn#getAllowedTimestampSkew() Javadoc
for details on changing the allowed skew.
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.checkTimestamp(SimpleDoFnRunner.java:446)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:422)
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn$ProcessContextAdapter.output(ElasticsearchIO.java:2364)
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.flushAndOutputResults(ElasticsearchIO.java:2404)
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.addAndMaybeFlush(ElasticsearchIO.java:2419)
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOStatefulFn.processElement(ElasticsearchIO.java:2300)

I’ve bisected it and 2.34 works fine, 2.35 is the first version this
breaks, and it seems like the code in the trace is largely added by the PR
linked above. The error usually claims a skew of a few seconds, but
obviously I can’t override getAllowedTimestampSkew() on the internal
Elastic DoFn, and it’s marked deprecated anyway.

I’m happy to raise a JIRA but I’m not 100% sure what the code was intending
to fix, and additionally, I’d also be happy if someone else can reproduce
this or knows of similar reports. I feel like what we’re doing is not *that*
uncommon a scenario, so I would have thought someone else would have hit
this by now.

Best,
Emils

Reply via email to