Fix flushing of pushed-back data in Flink Runner on +Inf watermark

Before, we only flushed all pushed-back data when receiving a +Inf
watermark on the side input. It can happen that we receive that
watermark before getting any data on the main input. This changes
DoFnOperator to also flush when receiving a main-input watermark and we
happen to have already received the +Inf watermark on the side input.

Some tests where Flaky because of this. One example was
ViewTest.testWindowedSideInputFixedToFixedWithDefault().


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/838035a4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/838035a4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/838035a4

Branch: refs/heads/master
Commit: 838035a4069b152143859e9b34570b15254d69b3
Parents: 9afe395
Author: Aljoscha Krettek <[email protected]>
Authored: Tue May 30 15:19:27 2017 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Tue May 30 15:23:55 2017 +0200

----------------------------------------------------------------------
 .../wrappers/streaming/DoFnOperator.java        | 77 +++++++++++++++-----
 1 file changed, 60 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/838035a4/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index d2ab7e1..e473046 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -19,6 +19,7 @@ package 
org.apache.beam.runners.flink.translation.wrappers.streaming;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 
+import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.collect.Iterables;
 import java.io.DataInputStream;
@@ -129,6 +130,8 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
 
   protected transient long currentInputWatermark;
 
+  protected transient long currentSideInputWatermark;
+
   protected transient long currentOutputWatermark;
 
   private transient StateTag<BagState<WindowedValue<InputT>>> pushedBackTag;
@@ -197,6 +200,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
     super.open();
 
     setCurrentInputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
+    
setCurrentSideInputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
     setCurrentOutputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
 
     sideInputReader = NullSideInputReader.of(sideInputs);
@@ -308,6 +312,21 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
   @Override
   public void close() throws Exception {
     super.close();
+
+    // sanity check: these should have been flushed out by +Inf watermarks
+    if (pushbackStateInternals != null) {
+      BagState<WindowedValue<InputT>> pushedBack =
+          pushbackStateInternals.state(StateNamespaces.global(), 
pushedBackTag);
+
+      Iterable<WindowedValue<InputT>> pushedBackContents = pushedBack.read();
+      if (pushedBackContents != null) {
+        if (!Iterables.isEmpty(pushedBackContents)) {
+          String pushedBackString = Joiner.on(",").join(pushedBackContents);
+          throw new RuntimeException(
+              "Leftover pushed-back data: " + pushedBackString + ". This 
indicates a bug.");
+        }
+      }
+    }
     doFnInvoker.invokeTeardown();
   }
 
@@ -457,36 +476,56 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
       }
       pushbackDoFnRunner.finishBundle();
     }
+
+    // We do the check here because we are guaranteed to at least get the +Inf 
watermark on the
+    // main input when the job finishes.
+    if (currentSideInputWatermark >= 
BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+      // this means we will never see any more side input
+      // we also do the check here because we might have received the 
side-input MAX watermark
+      // before receiving any main-input data
+      emitAllPushedBackData();
+    }
   }
 
   @Override
   public void processWatermark2(Watermark mark) throws Exception {
-    if (mark.getTimestamp() == BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+    setCurrentSideInputWatermark(mark.getTimestamp());
+    if (mark.getTimestamp() >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
       // this means we will never see any more side input
-      pushbackDoFnRunner.startBundle();
+      emitAllPushedBackData();
 
-      BagState<WindowedValue<InputT>> pushedBack =
-          pushbackStateInternals.state(StateNamespaces.global(), 
pushedBackTag);
+      // maybe output a new watermark
+      processWatermark1(new Watermark(currentInputWatermark));
+    }
+  }
 
-      Iterable<WindowedValue<InputT>> pushedBackContents = pushedBack.read();
-      if (pushedBackContents != null) {
-        for (WindowedValue<InputT> elem : pushedBackContents) {
+  /**
+   * Emits all pushed-back data. This should be used once we know that there 
will not be
+   * any future side input, i.e. that there is no point in waiting.
+   */
+  private void emitAllPushedBackData() throws Exception {
+    pushbackDoFnRunner.startBundle();
 
-          // we need to set the correct key in case the operator is
-          // a (keyed) window operator
-          setKeyContextElement1(new StreamRecord<>(elem));
+    BagState<WindowedValue<InputT>> pushedBack =
+        pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
 
-          doFnRunner.processElement(elem);
-        }
+    Iterable<WindowedValue<InputT>> pushedBackContents = pushedBack.read();
+    if (pushedBackContents != null) {
+      for (WindowedValue<InputT> elem : pushedBackContents) {
+
+        // we need to set the correct key in case the operator is
+        // a (keyed) window operator
+        setKeyContextElement1(new StreamRecord<>(elem));
+
+        doFnRunner.processElement(elem);
       }
+    }
 
-      setPushedBackWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
+    pushedBack.clear();
 
-      pushbackDoFnRunner.finishBundle();
+    setPushedBackWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
 
-      // maybe output a new watermark
-      processWatermark1(new Watermark(currentInputWatermark));
-    }
+    pushbackDoFnRunner.finishBundle();
   }
 
   @Override
@@ -610,6 +649,10 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
     this.currentInputWatermark = currentInputWatermark;
   }
 
+  private void setCurrentSideInputWatermark(long currentInputWatermark) {
+    this.currentSideInputWatermark = currentInputWatermark;
+  }
+
   private void setCurrentOutputWatermark(long currentOutputWatermark) {
     this.currentOutputWatermark = currentOutputWatermark;
   }

Reply via email to