Fix flushing of pushed-back data in Flink Runner on +Inf watermark Before, we only flushed all pushed-back data when receiving a +Inf watermark on the side input. It can happen that we receive that watermark before getting any data on the main input. This changes DoFnOperator to also flush when receiving a main-input watermark and we happen to have already received the +Inf watermark on the side input.
Some tests where Flaky because of this. One example was ViewTest.testWindowedSideInputFixedToFixedWithDefault(). Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/838035a4 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/838035a4 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/838035a4 Branch: refs/heads/master Commit: 838035a4069b152143859e9b34570b15254d69b3 Parents: 9afe395 Author: Aljoscha Krettek <[email protected]> Authored: Tue May 30 15:19:27 2017 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Tue May 30 15:23:55 2017 +0200 ---------------------------------------------------------------------- .../wrappers/streaming/DoFnOperator.java | 77 +++++++++++++++----- 1 file changed, 60 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/838035a4/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index d2ab7e1..e473046 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -19,6 +19,7 @@ package org.apache.beam.runners.flink.translation.wrappers.streaming; import static org.apache.flink.util.Preconditions.checkArgument; +import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.collect.Iterables; import java.io.DataInputStream; @@ -129,6 +130,8 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> protected transient long currentInputWatermark; + protected transient long currentSideInputWatermark; + protected transient long currentOutputWatermark; private transient StateTag<BagState<WindowedValue<InputT>>> pushedBackTag; @@ -197,6 +200,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> super.open(); setCurrentInputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()); + setCurrentSideInputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()); setCurrentOutputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()); sideInputReader = NullSideInputReader.of(sideInputs); @@ -308,6 +312,21 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> @Override public void close() throws Exception { super.close(); + + // sanity check: these should have been flushed out by +Inf watermarks + if (pushbackStateInternals != null) { + BagState<WindowedValue<InputT>> pushedBack = + pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag); + + Iterable<WindowedValue<InputT>> pushedBackContents = pushedBack.read(); + if (pushedBackContents != null) { + if (!Iterables.isEmpty(pushedBackContents)) { + String pushedBackString = Joiner.on(",").join(pushedBackContents); + throw new RuntimeException( + "Leftover pushed-back data: " + pushedBackString + ". This indicates a bug."); + } + } + } doFnInvoker.invokeTeardown(); } @@ -457,36 +476,56 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> } pushbackDoFnRunner.finishBundle(); } + + // We do the check here because we are guaranteed to at least get the +Inf watermark on the + // main input when the job finishes. + if (currentSideInputWatermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { + // this means we will never see any more side input + // we also do the check here because we might have received the side-input MAX watermark + // before receiving any main-input data + emitAllPushedBackData(); + } } @Override public void processWatermark2(Watermark mark) throws Exception { - if (mark.getTimestamp() == BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { + setCurrentSideInputWatermark(mark.getTimestamp()); + if (mark.getTimestamp() >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { // this means we will never see any more side input - pushbackDoFnRunner.startBundle(); + emitAllPushedBackData(); - BagState<WindowedValue<InputT>> pushedBack = - pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag); + // maybe output a new watermark + processWatermark1(new Watermark(currentInputWatermark)); + } + } - Iterable<WindowedValue<InputT>> pushedBackContents = pushedBack.read(); - if (pushedBackContents != null) { - for (WindowedValue<InputT> elem : pushedBackContents) { + /** + * Emits all pushed-back data. This should be used once we know that there will not be + * any future side input, i.e. that there is no point in waiting. + */ + private void emitAllPushedBackData() throws Exception { + pushbackDoFnRunner.startBundle(); - // we need to set the correct key in case the operator is - // a (keyed) window operator - setKeyContextElement1(new StreamRecord<>(elem)); + BagState<WindowedValue<InputT>> pushedBack = + pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag); - doFnRunner.processElement(elem); - } + Iterable<WindowedValue<InputT>> pushedBackContents = pushedBack.read(); + if (pushedBackContents != null) { + for (WindowedValue<InputT> elem : pushedBackContents) { + + // we need to set the correct key in case the operator is + // a (keyed) window operator + setKeyContextElement1(new StreamRecord<>(elem)); + + doFnRunner.processElement(elem); } + } - setPushedBackWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()); + pushedBack.clear(); - pushbackDoFnRunner.finishBundle(); + setPushedBackWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()); - // maybe output a new watermark - processWatermark1(new Watermark(currentInputWatermark)); - } + pushbackDoFnRunner.finishBundle(); } @Override @@ -610,6 +649,10 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> this.currentInputWatermark = currentInputWatermark; } + private void setCurrentSideInputWatermark(long currentInputWatermark) { + this.currentSideInputWatermark = currentInputWatermark; + } + private void setCurrentOutputWatermark(long currentOutputWatermark) { this.currentOutputWatermark = currentOutputWatermark; }
