Repository: beam Updated Branches: refs/heads/master 43c44232d -> ef56ea495
Fix race condition when outputting pushed-back elements in Flink Runner This affected the Flink Streaming Runner DoFnOperator. The recent fix of emitting pushed-back data when receiving a watermark on the first input put the emission at the end of the method. This can cause the emitted data to become late. The fix is to move the pushed-back element emission to the start of the method. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d17c0132 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d17c0132 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d17c0132 Branch: refs/heads/master Commit: d17c013240a14b12992cf00f30e5151c7e97f360 Parents: 43c4423 Author: Aljoscha Krettek <[email protected]> Authored: Fri Jun 2 15:57:01 2017 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Sun Jun 4 08:08:14 2017 +0200 ---------------------------------------------------------------------- .../wrappers/streaming/DoFnOperator.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d17c0132/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index e473046..594fe0e 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -445,6 +445,15 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> @Override public void processWatermark1(Watermark mark) throws Exception { + // We do the check here because we are guaranteed to at least get the +Inf watermark on the + // main input when the job finishes. + if (currentSideInputWatermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { + // this means we will never see any more side input + // we also do the check here because we might have received the side-input MAX watermark + // before receiving any main-input data + emitAllPushedBackData(); + } + if (keyCoder == null) { setCurrentInputWatermark(mark.getTimestamp()); long potentialOutputWatermark = @@ -476,15 +485,6 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> } pushbackDoFnRunner.finishBundle(); } - - // We do the check here because we are guaranteed to at least get the +Inf watermark on the - // main input when the job finishes. - if (currentSideInputWatermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { - // this means we will never see any more side input - // we also do the check here because we might have received the side-input MAX watermark - // before receiving any main-input data - emitAllPushedBackData(); - } } @Override
