In general, I'd also really like to improve my understanding and
learn more about how the employed buffering can cause this
skew. Is it because the call to "flush" is happening from a
different "current window" than the elements were originally
buffered from? I'm actually thinking that the PR[1] to "fix"
this would have had the side effect of outputting buffered
elements into the window from which "flush" was called rather
than the window from which the buffered data originated. I
suppose that could be problematic, but should at least satisfy
the validation code.
[1] https://github.com/apache/beam/pull/16744
On Mon, Mar 7, 2022 at 1:39 PM Evan Galpin
<evan.gal...@gmail.com> wrote:
x-post from the associated Jira ticket[0]
Fortunately/unfortunately this same issue struck me as well,
and I opened a PR[1] to use `ProcessContext#output` rather
than `ProcessContext#outputWithTimestamp`. I believe that
should resolve this issue, it has for me when running jobs
with a vendored SDK with that change included. Do folks
feel this change to be cherry-picked into 2.37.0?
The change also prompted a question to the mailing list[2]
about skew validation difference between ProcessContext vs
FinishBundleContext (where there is no ability to compute
skew as I understand it).
[0] https://issues.apache.org/jira/browse/BEAM-14064
[1] https://github.com/apache/beam/pull/16744
[2]
https://lists.apache.org/thread/33kj1yjmn6kkvpl4vz02vnfyn2bpzycp
On Mon, Mar 7, 2022 at 12:41 PM Luke Cwik <lc...@google.com>
wrote:
This is specifically a case where the @ProcessElement
saw window X for element X0 and buffered it into memory
and then when processing window Y and element Y0
wanted to flush previously buffered element X0. This all
occurred as part of the same bundle.
In general, yes, outputting to an earlier window is
problematic.
On Mon, Mar 7, 2022 at 9:32 AM Reuven Lax
<re...@google.com> wrote:
Outputting to an earlier window is problematic,as
the watermark can never be correct if a DoFn can
move time backwards arbitrarily.
On Mon, Mar 7, 2022 at 9:01 AM Luke Cwik
<lc...@google.com> wrote:
A good question would be should I be able to
output to a different window from the
current @ProcessElement call, like what we can
do from @FinishBundle to handle these buffering
scenarios.
On Mon, Mar 7, 2022 at 8:53 AM Luke Cwik
<lc...@google.com> wrote:
The issue is that ElasticsearchIO is
collecting results from elements in window X
and then trying to output them in window Y
when flushing the batch. This exposed a bug
where elements that were being buffered were
being output as part of a different window
than what the window that produced them was.
This became visible because validation was
added recently to ensure that when the
pipeline is processing elements in window X
that output with a timestamp is valid for
window X. Note that this validation only
occurs in @ProcessElement since output is
associated with the current window with the
input element that is being processed.
It is ok to do this in @FinishBundle since
there is no existing windowing context and
when you output that element is assigned to
an appropriate window.
Filed
https://issues.apache.org/jira/browse/BEAM-14064
On Mon, Mar 7, 2022 at 7:44 AM Emils
Solmanis <emils.solma...@rvu.co.uk> wrote:
Hi all,
I think we’re hitting a regression in
ElasticIO batch writing.
We’ve bisected it to being introduced in
2.35.0, and I’m reasonably certain it’s
this PR
https://github.com/apache/beam/pull/15381
Our scenario is pretty trivial, we read
off Pubsub and write to Elastic in a
streaming job, the config for the source
and sink is respectively
|pipeline.apply(
PubsubIO.readStrings().fromSubscription(subscription)
).apply(ParseJsons.of(OurObject::class.java))
.setCoder(KryoCoder.of()) |
and
|ElasticsearchIO.write()
.withUseStatefulBatches(true)
.withMaxParallelRequestsPerWindow(1)
.withMaxBufferingDuration(Duration.standardSeconds(30))
// 5 bytes **> KiB **> MiB, so 5 MiB
.withMaxBatchSizeBytes(5L * 1024 * 1024)
// # of docs .withMaxBatchSize(1000)
.withConnectionConfiguration(
ElasticsearchIO.ConnectionConfiguration.create(
arrayOf(host), "fubar", "_doc"
).withConnectTimeout(5000)
.withSocketTimeout(30000) )
.withRetryConfiguration(
ElasticsearchIO.RetryConfiguration.create(
10, // the duration is wall clock,
against the connection and socket
timeouts specified // above. I.e., 10 x
30s is gonna be more than 3 minutes, so
if we're getting // 10 socket timeouts
in a row, this would ignore the "10"
part and terminate // after 6. The idea
is that in a mixed failure mode, you'd
get different timeouts // of different
durations, and on average 10 x fails <
4m. // That said, 4m is arbitrary, so
adjust as and when needed.
Duration.standardMinutes(4) ) )
.withIdFn { f: JsonNode ->
f["id"].asText() } .withIndexFn { f:
JsonNode -> f["schema_name"].asText() }
.withIsDeleteFn { f: JsonNode ->
f["_action"].asText("noop") == "delete" } |
We recently tried upgrading 2.33 to 2.36
and immediately hit a bug in the
consumer, due to alleged time skew,
specifically
|2022-03-07 10:48:37.886 GMTError
message from worker:
java.lang.IllegalArgumentException:
Cannot output with timestamp
2022-03-07T10:43:38.640Z. Output
timestamps must be no earlier than the
timestamp of the current input
(2022-03-07T10:43:43.562Z) minus the
allowed skew (0 milliseconds) and no
later than 294247-01-10T04:00:54.775Z.
See the DoFn#getAllowedTimestampSkew()
Javadoc for details on changing the
allowed skew.
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.checkTimestamp(SimpleDoFnRunner.java:446)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:422)
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn$ProcessContextAdapter.output(ElasticsearchIO.java:2364)
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.flushAndOutputResults(ElasticsearchIO.java:2404)
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.addAndMaybeFlush(ElasticsearchIO.java:2419)
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOStatefulFn.processElement(ElasticsearchIO.java:2300)
|
I’ve bisected it and 2.34 works fine,
2.35 is the first version this breaks,
and it seems like the code in the trace
is largely added by the PR linked above.
The error usually claims a skew of a few
seconds, but obviously I can’t override
|getAllowedTimestampSkew()| on the
internal Elastic DoFn, and it’s marked
deprecated anyway.
I’m happy to raise a JIRA but I’m not
100% sure what the code was intending to
fix, and additionally, I’d also be happy
if someone else can reproduce this or
knows of similar reports. I feel like
what we’re doing is not *that* uncommon
a scenario, so I would have thought
someone else would have hit this by now.
Best,
Emils