This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 3eecb381f3fb412df6844cfc13b12bf265253926
Author: Aljoscha Krettek <[email protected]>
AuthorDate: Fri Jan 5 14:17:49 2018 +0100

    [BEAM-2140] Block DoFnOperator.close() if we have pending timers
    
    It can happen that the input operation finishes while we still have
    pending processing-time timers, for example from processing a Splittable
    DoFn. This change makes sure that we block as long as we have pending
    timers.
    
    This change also makes sure that we forward a +Inf watermark in close().
    We have to do this because it can happen that we get a +Inf watermark on
    input while we still have active watermark holds (which will get
    resolved when all pending timers are gone). With this change we make
    sure to send a +Inf watermark downstream once everything is resolved.
---
 .../wrappers/streaming/DoFnOperator.java           | 43 +++++++++++++++++-----
 1 file changed, 33 insertions(+), 10 deletions(-)

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index dd2f9c4..37f56f5 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -371,18 +371,41 @@ public class DoFnOperator<InputT, OutputT>
 
   @Override
   public void close() throws Exception {
-    super.close();
-
-    // sanity check: these should have been flushed out by +Inf watermarks
-    if (!sideInputs.isEmpty() && nonKeyedStateInternals != null) {
-      BagState<WindowedValue<InputT>> pushedBack =
-          nonKeyedStateInternals.state(StateNamespaces.global(), 
pushedBackTag);
+    try {
 
-      Iterable<WindowedValue<InputT>> pushedBackContents = pushedBack.read();
-      if (pushedBackContents != null && 
!Iterables.isEmpty(pushedBackContents)) {
-        String pushedBackString = Joiner.on(",").join(pushedBackContents);
+      // This is our last change to block shutdown of this operator while
+      // there are still remaining processing-time timers. Flink will ignore 
pending
+      // processing-time timers when upstream operators have shut down and 
will also
+      // shut down this operator with pending processing-time timers.
+      while (this.numProcessingTimeTimers() > 0) {
+        getContainingTask().getCheckpointLock().wait(100);
+      }
+      if (this.numProcessingTimeTimers() > 0) {
         throw new RuntimeException(
-            "Leftover pushed-back data: " + pushedBackString + ". This 
indicates a bug.");
+            "There are still processing-time timers left, this indicates a 
bug");
+      }
+
+      // make sure we send a +Inf watermark downstream. It can happen that we 
receive +Inf
+      // in processWatermark*() but have holds, so we have to re-evaluate here.
+      processWatermark(new Watermark(Long.MAX_VALUE));
+      if (currentOutputWatermark < Long.MAX_VALUE) {
+        throw new RuntimeException("There are still watermark holds. Watermark 
held at "
+            + keyedStateInternals.watermarkHold().getMillis() + ".");
+      }
+    } finally {
+      super.close();
+
+      // sanity check: these should have been flushed out by +Inf watermarks
+      if (!sideInputs.isEmpty() && nonKeyedStateInternals != null) {
+        BagState<WindowedValue<InputT>> pushedBack =
+            nonKeyedStateInternals.state(StateNamespaces.global(), 
pushedBackTag);
+
+        Iterable<WindowedValue<InputT>> pushedBackContents = pushedBack.read();
+        if (pushedBackContents != null && 
!Iterables.isEmpty(pushedBackContents)) {
+          String pushedBackString = Joiner.on(",").join(pushedBackContents);
+          throw new RuntimeException(
+              "Leftover pushed-back data: " + pushedBackString + ". This 
indicates a bug.");
+        }
       }
     }
   }

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to