The buffering seems incorrect to me. Whenever there is a buffer, we need
to make sure we hold the output watermark, otherwise the watermark might
"jump over" a buffered element transitioning it from "on-time" to
"late", which would be a correctness bug (we can transition elements
only from "late" to "on-time", never the other way around). The
alternative is to use @FinishBundle to do the flushing, but might not be
appropriate here.
Currently, the only way to limit the progress of output watermark is by
setting a timer with output timestamp that has the timestamp of the
earliest element in the buffer. There was a thread that was discussing
this in more details [1].
Jan
[1] https://lists.apache.org/thread/y4n5rnhfmh47orhhr92og2r0plvgopl8
On 3/7/22 19:54, Evan Galpin wrote:
In general, I'd also really like to improve my understanding and learn
more about how the employed buffering can cause this skew. Is it
because the call to "flush" is happening from a different "current
window" than the elements were originally buffered from? I'm actually
thinking that the PR[1] to "fix" this would have had the side effect
of outputting buffered elements into the window from which "flush" was
called rather than the window from which the buffered data originated.
I suppose that could be problematic, but should at least satisfy the
validation code.
[1] https://github.com/apache/beam/pull/16744
On Mon, Mar 7, 2022 at 1:39 PM Evan Galpin <evan.gal...@gmail.com> wrote:
x-post from the associated Jira ticket[0]
Fortunately/unfortunately this same issue struck me as well, and I
opened a PR[1] to use `ProcessContext#output` rather than
`ProcessContext#outputWithTimestamp`. I believe that should
resolve this issue, it has for me when running jobs with a
vendored SDK with that change included. Do folks feel this change
to be cherry-picked into 2.37.0?
The change also prompted a question to the mailing list[2] about
skew validation difference between ProcessContext vs
FinishBundleContext (where there is no ability to compute skew as
I understand it).
[0] https://issues.apache.org/jira/browse/BEAM-14064
[1] https://github.com/apache/beam/pull/16744
[2] https://lists.apache.org/thread/33kj1yjmn6kkvpl4vz02vnfyn2bpzycp
On Mon, Mar 7, 2022 at 12:41 PM Luke Cwik <lc...@google.com> wrote:
This is specifically a case where the @ProcessElement saw
window X for element X0 and buffered it into memory and then
when processing window Y and element Y0 wanted to flush
previously buffered element X0. This all occurred as part of
the same bundle.
In general, yes, outputting to an earlier window is problematic.
On Mon, Mar 7, 2022 at 9:32 AM Reuven Lax <re...@google.com>
wrote:
Outputting to an earlier window is problematic,as the
watermark can never be correct if a DoFn can move time
backwards arbitrarily.
On Mon, Mar 7, 2022 at 9:01 AM Luke Cwik
<lc...@google.com> wrote:
A good question would be should I be able to output to
a different window from the current @ProcessElement
call, like what we can do from @FinishBundle to handle
these buffering scenarios.
On Mon, Mar 7, 2022 at 8:53 AM Luke Cwik
<lc...@google.com> wrote:
The issue is that ElasticsearchIO is collecting
results from elements in window X and then trying
to output them in window Y when flushing the
batch. This exposed a bug where elements that were
being buffered were being output as part of a
different window than what the window that
produced them was.
This became visible because validation was added
recently to ensure that when the pipeline is
processing elements in window X that output with a
timestamp is valid for window X. Note that this
validation only occurs in @ProcessElement since
output is associated with the current window with
the input element that is being processed.
It is ok to do this in @FinishBundle since there
is no existing windowing context and when you
output that element is assigned to an appropriate
window.
Filed https://issues.apache.org/jira/browse/BEAM-14064
On Mon, Mar 7, 2022 at 7:44 AM Emils Solmanis
<emils.solma...@rvu.co.uk> wrote:
Hi all,
I think we’re hitting a regression in
ElasticIO batch writing.
We’ve bisected it to being introduced in
2.35.0, and I’m reasonably certain it’s this
PR https://github.com/apache/beam/pull/15381
Our scenario is pretty trivial, we read off
Pubsub and write to Elastic in a streaming
job, the config for the source and sink is
respectively
|pipeline.apply(
PubsubIO.readStrings().fromSubscription(subscription)
).apply(ParseJsons.of(OurObject::class.java))
.setCoder(KryoCoder.of()) |
and
|ElasticsearchIO.write()
.withUseStatefulBatches(true)
.withMaxParallelRequestsPerWindow(1)
.withMaxBufferingDuration(Duration.standardSeconds(30))
// 5 bytes **> KiB **> MiB, so 5 MiB
.withMaxBatchSizeBytes(5L * 1024 * 1024) // #
of docs .withMaxBatchSize(1000)
.withConnectionConfiguration(
ElasticsearchIO.ConnectionConfiguration.create(
arrayOf(host), "fubar", "_doc"
).withConnectTimeout(5000)
.withSocketTimeout(30000) )
.withRetryConfiguration(
ElasticsearchIO.RetryConfiguration.create( 10,
// the duration is wall clock, against the
connection and socket timeouts specified //
above. I.e., 10 x 30s is gonna be more than 3
minutes, so if we're getting // 10 socket
timeouts in a row, this would ignore the "10"
part and terminate // after 6. The idea is
that in a mixed failure mode, you'd get
different timeouts // of different durations,
and on average 10 x fails < 4m. // That said,
4m is arbitrary, so adjust as and when needed.
Duration.standardMinutes(4) ) ) .withIdFn { f:
JsonNode -> f["id"].asText() } .withIndexFn {
f: JsonNode -> f["schema_name"].asText() }
.withIsDeleteFn { f: JsonNode ->
f["_action"].asText("noop") == "delete" } |
We recently tried upgrading 2.33 to 2.36 and
immediately hit a bug in the consumer, due to
alleged time skew, specifically
|2022-03-07 10:48:37.886 GMTError message from
worker: java.lang.IllegalArgumentException:
Cannot output with timestamp
2022-03-07T10:43:38.640Z. Output timestamps
must be no earlier than the timestamp of the
current input (2022-03-07T10:43:43.562Z) minus
the allowed skew (0 milliseconds) and no later
than 294247-01-10T04:00:54.775Z. See the
DoFn#getAllowedTimestampSkew() Javadoc for
details on changing the allowed skew.
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.checkTimestamp(SimpleDoFnRunner.java:446)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:422)
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn$ProcessContextAdapter.output(ElasticsearchIO.java:2364)
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.flushAndOutputResults(ElasticsearchIO.java:2404)
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.addAndMaybeFlush(ElasticsearchIO.java:2419)
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOStatefulFn.processElement(ElasticsearchIO.java:2300)
|
I’ve bisected it and 2.34 works fine, 2.35 is
the first version this breaks, and it seems
like the code in the trace is largely added by
the PR linked above. The error usually claims
a skew of a few seconds, but obviously I can’t
override |getAllowedTimestampSkew()| on the
internal Elastic DoFn, and it’s marked
deprecated anyway.
I’m happy to raise a JIRA but I’m not 100%
sure what the code was intending to fix, and
additionally, I’d also be happy if someone
else can reproduce this or knows of similar
reports. I feel like what we’re doing is not
*that* uncommon a scenario, so I would have
thought someone else would have hit this by now.
Best,
Emils