This is an automated email from the ASF dual-hosted git repository.

ibzib pushed a commit to branch release-2.21.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.21.0 by this push:
     new f2ee530  Merge pull request #11362: [BEAM-9733] Improve watermark and 
timer handling
     new 36f19f0  Merge pull request #11518 from mxm/release-2.21.0
f2ee530 is described below

commit f2ee530ad52e4e79b0f76b1abb25a5ebb98d5a39
Author: Maximilian Michels <[email protected]>
AuthorDate: Fri Apr 24 12:45:33 2020 +0200

    Merge pull request #11362: [BEAM-9733] Improve watermark and timer handling
    
    Backport of #11362 / 643945a8e4
---
 runners/flink/flink_runner.gradle                  |   5 +-
 .../functions/ImpulseSourceFunction.java           |  12 +
 .../wrappers/streaming/DoFnOperator.java           | 213 ++++++++++------
 .../streaming/ExecutableStageDoFnOperator.java     | 204 +++++++++++----
 .../wrappers/streaming/SplittableDoFnOperator.java |   4 +
 .../beam/runners/flink/FlinkSavepointTest.java     |  34 +--
 .../runners/flink/PortableTimersExecutionTest.java |  35 ++-
 .../functions/ImpulseSourceFunctionTest.java       |  17 +-
 .../wrappers/streaming/DoFnOperatorTest.java       |   5 +-
 .../streaming/ExecutableStageDoFnOperatorTest.java | 273 +++++++++++++++++----
 .../wrappers/streaming/WindowDoFnOperatorTest.java |   4 +-
 11 files changed, 587 insertions(+), 219 deletions(-)

diff --git a/runners/flink/flink_runner.gradle 
b/runners/flink/flink_runner.gradle
index 460099a..f5ecc69 100644
--- a/runners/flink/flink_runner.gradle
+++ b/runners/flink/flink_runner.gradle
@@ -111,7 +111,10 @@ spotless {
 
 test {
   systemProperty "log4j.configuration", "log4j-test.properties"
-  //systemProperty "org.slf4j.simpleLogger.defaultLogLevel", "debug"
+  // Change log level to debug:
+  // systemProperty "org.slf4j.simpleLogger.defaultLogLevel", "debug"
+  // Change log level to debug only for the package and nested packages:
+  // systemProperty 
"org.slf4j.simpleLogger.log.org.apache.beam.runners.flink.translation.wrappers.streaming",
 "debug"
   jvmArgs "-XX:-UseGCOverheadLimit"
   if (System.getProperty("beamSurefireArgline")) {
     jvmArgs System.getProperty("beamSurefireArgline")
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ImpulseSourceFunction.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ImpulseSourceFunction.java
index 4e50ce1..6c610de 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ImpulseSourceFunction.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ImpulseSourceFunction.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
 
 /**
  * Source function which sends a single global impulse to a downstream 
operator. It may keep the
@@ -58,6 +59,17 @@ public class ImpulseSourceFunction
         impulseEmitted.add(true);
       }
     }
+    // Always emit a final watermark.
+    // (1) In case we didn't restore the pipeline, this is important to close 
the global window;
+    // if no operator holds back this watermark.
+    // (2) In case we are restoring the pipeline, this is needed to initialize 
the operators with
+    // the current watermark and trigger execution of any pending timers.
+    sourceContext.emitWatermark(Watermark.MAX_WATERMARK);
+    // Wait to allow checkpoints of the pipeline
+    waitToEnsureCheckpointingWorksCorrectly();
+  }
+
+  private void waitToEnsureCheckpointingWorksCorrectly() {
     // Do nothing, but still look busy ...
     // we can't return here since Flink requires that all operators stay up,
     // otherwise checkpointing would not work correctly anymore
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index a6d6f1b..5dcd994 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -116,6 +116,8 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.util.OutputTag;
 import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Flink operator for executing {@link DoFn DoFns}.
@@ -123,11 +125,15 @@ import org.joda.time.Instant;
  * @param <InputT> the input type of the {@link DoFn}
  * @param <OutputT> the output type of the {@link DoFn}
  */
+// We use Flink's lifecycle methods to initialize transient fields
+@SuppressFBWarnings("SE_TRANSIENT_FIELD_NOT_RESTORED")
 public class DoFnOperator<InputT, OutputT> extends 
AbstractStreamOperator<WindowedValue<OutputT>>
     implements OneInputStreamOperator<WindowedValue<InputT>, 
WindowedValue<OutputT>>,
         TwoInputStreamOperator<WindowedValue<InputT>, RawUnionValue, 
WindowedValue<OutputT>>,
         Triggerable<ByteBuffer, TimerData> {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(DoFnOperator.class);
+
   protected DoFn<InputT, OutputT> doFn;
 
   protected final SerializablePipelineOptions serializedOptions;
@@ -206,14 +212,16 @@ public class DoFnOperator<InputT, OutputT> extends 
AbstractStreamOperator<Window
   private transient volatile long elementCount;
   /** Time that the last bundle was finished (to set the timer). */
   private transient volatile long lastFinishBundleTime;
+  /** Callback to be executed before the current bundle is started. */
+  private transient volatile Runnable preBundleCallback;
   /** Callback to be executed after the current bundle was finished. */
   private transient volatile Runnable bundleFinishedCallback;
 
   // Watermark state.
   // Volatile because these can be set in two mutually exclusive threads (see 
above).
-  protected transient volatile long currentInputWatermark;
-  protected transient volatile long currentSideInputWatermark;
-  protected transient volatile long currentOutputWatermark;
+  private transient volatile long currentInputWatermark;
+  private transient volatile long currentSideInputWatermark;
+  private transient volatile long currentOutputWatermark;
   private transient volatile long pushedBackWatermark;
 
   /** Constructor for DoFnOperator. */
@@ -353,9 +361,9 @@ public class DoFnOperator<InputT, OutputT> extends 
AbstractStreamOperator<Window
       pushedBackElementsHandler = 
NonKeyedPushedBackElementsHandler.create(listState);
     }
 
-    setCurrentInputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
-    
setCurrentSideInputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
-    setCurrentOutputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
+    currentInputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
+    currentSideInputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
+    currentOutputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
 
     sideInputReader = NullSideInputReader.of(sideInputs);
 
@@ -371,9 +379,9 @@ public class DoFnOperator<InputT, OutputT> extends 
AbstractStreamOperator<Window
       Stream<WindowedValue<InputT>> pushedBack = 
pushedBackElementsHandler.getElements();
       long min =
           pushedBack.map(v -> 
v.getTimestamp().getMillis()).reduce(Long.MAX_VALUE, Math::min);
-      setPushedBackWatermark(min);
+      pushedBackWatermark = min;
     } else {
-      setPushedBackWatermark(Long.MAX_VALUE);
+      pushedBackWatermark = Long.MAX_VALUE;
     }
 
     // StatefulPardo or WindowDoFn
@@ -549,15 +557,20 @@ public class DoFnOperator<InputT, OutputT> extends 
AbstractStreamOperator<Window
     }
   }
 
-  protected long getPushbackWatermarkHold() {
-    return pushedBackWatermark;
+  public long getEffectiveInputWatermark() {
+    // hold back by the pushed back values waiting for side inputs
+    return Math.min(pushedBackWatermark, currentInputWatermark);
+  }
+
+  public long getCurrentOutputWatermark() {
+    return currentOutputWatermark;
   }
 
-  protected void setPushedBackWatermark(long watermark) {
-    pushedBackWatermark = watermark;
+  protected final void setPreBundleCallback(Runnable callback) {
+    this.preBundleCallback = callback;
   }
 
-  protected void setBundleFinishedCallback(Runnable callback) {
+  protected final void setBundleFinishedCallback(Runnable callback) {
     this.bundleFinishedCallback = callback;
   }
 
@@ -581,7 +594,7 @@ public class DoFnOperator<InputT, OutputT> extends 
AbstractStreamOperator<Window
       min = Math.min(min, pushedBackValue.getTimestamp().getMillis());
       pushedBackElementsHandler.pushBack(pushedBackValue);
     }
-    setPushedBackWatermark(min);
+    pushedBackWatermark = min;
 
     checkInvokeFinishBundleByCount();
   }
@@ -635,7 +648,7 @@ public class DoFnOperator<InputT, OutputT> extends 
AbstractStreamOperator<Window
       min = Math.min(min, pushedBackValue.getTimestamp().getMillis());
       pushedBackElementsHandler.pushBack(pushedBackValue);
     }
-    setPushedBackWatermark(min);
+    pushedBackWatermark = min;
 
     checkInvokeFinishBundleByCount();
 
@@ -644,12 +657,12 @@ public class DoFnOperator<InputT, OutputT> extends 
AbstractStreamOperator<Window
   }
 
   @Override
-  public void processWatermark(Watermark mark) throws Exception {
+  public final void processWatermark(Watermark mark) throws Exception {
     processWatermark1(mark);
   }
 
   @Override
-  public void processWatermark1(Watermark mark) throws Exception {
+  public final void processWatermark1(Watermark mark) throws Exception {
     // We do the check here because we are guaranteed to at least get the +Inf 
watermark on the
     // main input when the job finishes.
     if (currentSideInputWatermark >= 
BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
@@ -659,46 +672,69 @@ public class DoFnOperator<InputT, OutputT> extends 
AbstractStreamOperator<Window
       emitAllPushedBackData();
     }
 
-    setCurrentInputWatermark(mark.getTimestamp());
+    currentInputWatermark = mark.getTimestamp();
 
-    if (keyCoder == null) {
-      long potentialOutputWatermark = Math.min(getPushbackWatermarkHold(), 
currentInputWatermark);
-      if (potentialOutputWatermark > currentOutputWatermark) {
-        setCurrentOutputWatermark(potentialOutputWatermark);
-        emitWatermark(currentOutputWatermark);
-      }
-    } else {
-      // hold back by the pushed back values waiting for side inputs
-      long pushedBackInputWatermark = Math.min(getPushbackWatermarkHold(), 
mark.getTimestamp());
+    long inputWatermarkHold = 
applyInputWatermarkHold(getEffectiveInputWatermark());
+    if (keyCoder != null) {
+      timeServiceManager.advanceWatermark(new Watermark(inputWatermarkHold));
+    }
 
-      timeServiceManager.advanceWatermark(new 
Watermark(pushedBackInputWatermark));
+    long potentialOutputWatermark =
+        applyOutputWatermarkHold(
+            currentOutputWatermark, 
computeOutputWatermark(inputWatermarkHold));
+    maybeEmitWatermark(potentialOutputWatermark);
+  }
 
-      Instant watermarkHold = keyedStateInternals.watermarkHold();
+  /**
+   * Allows to apply a hold to the input watermark. By default, just passes 
the input watermark
+   * through.
+   */
+  public long applyInputWatermarkHold(long inputWatermark) {
+    return inputWatermark;
+  }
 
-      long combinedWatermarkHold = Math.min(watermarkHold.getMillis(), 
getPushbackWatermarkHold());
-      combinedWatermarkHold =
-          Math.min(combinedWatermarkHold, 
timerInternals.getMinOutputTimestampMs());
-      long potentialOutputWatermark = Math.min(pushedBackInputWatermark, 
combinedWatermarkHold);
+  /**
+   * Allows to apply a hold to the output watermark before it is send out. By 
default, just passes
+   * the potential output watermark through which will make it the new output 
watermark.
+   *
+   * @param currentOutputWatermark the current output watermark
+   * @param potentialOutputWatermark The potential new output watermark which 
can be adjusted, if
+   *     needed. The input watermark hold has already been applied.
+   * @return The new output watermark which will be emitted.
+   */
+  public long applyOutputWatermarkHold(long currentOutputWatermark, long 
potentialOutputWatermark) {
+    return potentialOutputWatermark;
+  }
 
-      if (potentialOutputWatermark > currentOutputWatermark) {
-        setCurrentOutputWatermark(potentialOutputWatermark);
-        emitWatermark(currentOutputWatermark);
-      }
+  private long computeOutputWatermark(long inputWatermarkHold) {
+    final long potentialOutputWatermark;
+    if (keyCoder == null) {
+      potentialOutputWatermark = inputWatermarkHold;
+    } else {
+      Instant watermarkHold = keyedStateInternals.watermarkHold();
+      long combinedWatermarkHold = Math.min(watermarkHold.getMillis(), 
inputWatermarkHold);
+      potentialOutputWatermark =
+          Math.min(combinedWatermarkHold, 
timerInternals.getMinOutputTimestampMs());
     }
+    return potentialOutputWatermark;
   }
 
-  private void emitWatermark(long watermark) {
-    // Must invoke finishBatch before emit the +Inf watermark otherwise there 
are some late events.
-    if (watermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
-      invokeFinishBundle();
+  private void maybeEmitWatermark(long watermark) {
+    if (watermark > currentOutputWatermark) {
+      // Must invoke finishBatch before emit the +Inf watermark otherwise 
there are some late
+      // events.
+      if (watermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+        invokeFinishBundle();
+      }
+      LOG.debug("Emitting watermark {}", watermark);
+      currentOutputWatermark = watermark;
+      output.emitWatermark(new Watermark(watermark));
     }
-    output.emitWatermark(new Watermark(watermark));
   }
 
   @Override
-  public void processWatermark2(Watermark mark) throws Exception {
-
-    setCurrentSideInputWatermark(mark.getTimestamp());
+  public final void processWatermark2(Watermark mark) throws Exception {
+    currentSideInputWatermark = mark.getTimestamp();
     if (mark.getTimestamp() >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
       // this means we will never see any more side input
       emitAllPushedBackData();
@@ -727,8 +763,7 @@ public class DoFnOperator<InputT, OutputT> extends 
AbstractStreamOperator<Window
     }
 
     pushedBackElementsHandler.clear();
-
-    setPushedBackWatermark(Long.MAX_VALUE);
+    pushedBackWatermark = Long.MAX_VALUE;
   }
 
   /**
@@ -743,7 +778,11 @@ public class DoFnOperator<InputT, OutputT> extends 
AbstractStreamOperator<Window
    */
   private void checkInvokeStartBundle() {
     if (!bundleStarted) {
+      LOG.debug("Starting bundle.");
       outputManager.flushBuffer();
+      if (preBundleCallback != null) {
+        preBundleCallback.run();
+      }
       pushbackDoFnRunner.startBundle();
       bundleStarted = true;
     }
@@ -773,15 +812,17 @@ public class DoFnOperator<InputT, OutputT> extends 
AbstractStreamOperator<Window
 
   protected final void invokeFinishBundle() {
     if (bundleStarted) {
+      LOG.debug("Finishing bundle.");
       pushbackDoFnRunner.finishBundle();
+      LOG.debug("Finished bundle. Element count: {}", elementCount);
       elementCount = 0L;
       lastFinishBundleTime = 
getProcessingTimeService().getCurrentProcessingTime();
       bundleStarted = false;
       // callback only after current bundle was fully finalized
       // it could start a new bundle, for example resulting from timer 
processing
       if (bundleFinishedCallback != null) {
+        LOG.debug("Invoking bundle finish callback.");
         bundleFinishedCallback.run();
-        bundleFinishedCallback = null;
       }
     }
   }
@@ -843,6 +884,11 @@ public class DoFnOperator<InputT, OutputT> extends 
AbstractStreamOperator<Window
 
   // allow overriding this in WindowDoFnOperator
   protected void fireTimer(TimerData timerData) {
+    LOG.debug(
+        "Firing timer: {} at {} with output time {}",
+        timerData.getTimerId(),
+        timerData.getTimestamp().getMillis(),
+        timerData.getOutputTimestamp().getMillis());
     StateNamespace namespace = timerData.getNamespace();
     // This is a user timer, so namespace must be WindowNamespace
     checkArgument(namespace instanceof WindowNamespace);
@@ -857,18 +903,6 @@ public class DoFnOperator<InputT, OutputT> extends 
AbstractStreamOperator<Window
         timerData.getDomain());
   }
 
-  private void setCurrentInputWatermark(long currentInputWatermark) {
-    this.currentInputWatermark = currentInputWatermark;
-  }
-
-  private void setCurrentSideInputWatermark(long currentInputWatermark) {
-    this.currentSideInputWatermark = currentInputWatermark;
-  }
-
-  private void setCurrentOutputWatermark(long currentOutputWatermark) {
-    this.currentOutputWatermark = currentOutputWatermark;
-  }
-
   @SuppressWarnings("unchecked")
   Coder<InputT> getInputCoder() {
     return (Coder<InputT>) 
Iterables.getOnlyElement(windowedInputCoder.getCoderArguments());
@@ -1259,6 +1293,11 @@ public class DoFnOperator<InputT, OutputT> extends 
AbstractStreamOperator<Window
     @Override
     public void setTimer(TimerData timer) {
       try {
+        LOG.debug(
+            "Setting timer: {} at {} with output time {}",
+            timer.getTimerId(),
+            timer.getTimestamp().getMillis(),
+            timer.getOutputTimestamp().getMillis());
         String contextTimerId = getContextTimerId(timer.getTimerId(), 
timer.getNamespace());
         // Only one timer can exist at a time for a given timer id and context.
         // If a timer gets set twice in the same context, the second must
@@ -1367,7 +1406,7 @@ public class DoFnOperator<InputT, OutputT> extends 
AbstractStreamOperator<Window
 
     @Override
     public Instant currentInputWatermarkTime() {
-      return new Instant(Math.min(currentInputWatermark, 
getPushbackWatermarkHold()));
+      return new Instant(getEffectiveInputWatermark());
     }
 
     @Nullable
@@ -1376,30 +1415,44 @@ public class DoFnOperator<InputT, OutputT> extends 
AbstractStreamOperator<Window
       return new Instant(currentOutputWatermark);
     }
 
+    /**
+     * Check whether event time timers lower or equal to the given timestamp 
exist. Caution: This is
+     * scoped by the current key.
+     */
+    public boolean hasPendingEventTimeTimers(long maxTimestamp) throws 
Exception {
+      for (TimerData timer : pendingTimersById.values()) {
+        if (timer.getDomain() == TimeDomain.EVENT_TIME
+            && timer.getTimestamp().getMillis() <= maxTimestamp) {
+          return true;
+        }
+      }
+      return false;
+    }
+
     /** Unique contextual id of a timer. Used to look up any existing timers 
in a context. */
     private String getContextTimerId(String timerId, StateNamespace namespace) 
{
       return timerId + namespace.stringKey();
     }
+  }
 
-    /**
-     * In Beam, a timer with timestamp {@code T} is only illegible for firing 
when the time has
-     * moved past this time stamp, i.e. {@code T < current_time}. In the case 
of event time,
-     * current_time is the watermark, in the case of processing time it is the 
system time.
-     *
-     * <p>Flink's TimerService has different semantics because it only ensures 
{@code T <=
-     * current_time}.
-     *
-     * <p>To make up for this, we need to add one millisecond to Flink's 
internal timer timestamp.
-     * Note that we do not modify Beam's timestamp and we are not exposing 
Flink's timestamp.
-     *
-     * <p>See also https://jira.apache.org/jira/browse/BEAM-3863
-     */
-    private long adjustTimestampForFlink(long beamTimerTimestamp) {
-      if (beamTimerTimestamp == Long.MAX_VALUE) {
-        // We would overflow, do not adjust timestamp
-        return Long.MAX_VALUE;
-      }
-      return beamTimerTimestamp + 1;
+  /**
+   * In Beam, a timer with timestamp {@code T} is only illegible for firing 
when the time has moved
+   * past this time stamp, i.e. {@code T < current_time}. In the case of event 
time, current_time is
+   * the watermark, in the case of processing time it is the system time.
+   *
+   * <p>Flink's TimerService has different semantics because it only ensures 
{@code T <=
+   * current_time}.
+   *
+   * <p>To make up for this, we need to add one millisecond to Flink's 
internal timer timestamp.
+   * Note that we do not modify Beam's timestamp and we are not exposing 
Flink's timestamp.
+   *
+   * <p>See also https://jira.apache.org/jira/browse/BEAM-3863
+   */
+  static long adjustTimestampForFlink(long beamTimerTimestamp) {
+    if (beamTimerTimestamp == Long.MAX_VALUE) {
+      // We would overflow, do not adjust timestamp
+      return Long.MAX_VALUE;
     }
+    return beamTimerTimestamp + 1;
   }
 }
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index 594ec59..1bbd1e5 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -17,9 +17,11 @@
  */
 package org.apache.beam.runners.flink.translation.wrappers.streaming;
 
+import static 
org.apache.beam.runners.core.StatefulDoFnRunner.TimeInternalsCleanupTimer.GC_TIMER_ID;
 import static 
org.apache.beam.runners.flink.translation.utils.FlinkPortableRunnerUtils.requiresTimeSortedInput;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
@@ -75,6 +77,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.fn.IdGenerator;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.function.ThrowingFunction;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.state.BagState;
 import org.apache.beam.sdk.state.TimeDomain;
@@ -102,6 +105,7 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -114,6 +118,8 @@ import org.slf4j.LoggerFactory;
  *
  * <p>TODO Integrate support for progress updates and metrics
  */
+// We use Flink's lifecycle methods to initialize transient fields
+@SuppressFBWarnings("SE_TRANSIENT_FIELD_NOT_RESTORED")
 public class ExecutableStageDoFnOperator<InputT, OutputT> extends 
DoFnOperator<InputT, OutputT> {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ExecutableStageDoFnOperator.class);
@@ -135,11 +141,17 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> 
extends DoFnOperator<I
   private transient ExecutableStage executableStage;
   private transient SdkHarnessDoFnRunner<InputT, OutputT> sdkHarnessRunner;
 
-  /**
-   * Watermark held back due to async processing. Volatile due to multiple 
mutually exclusive
-   * threads. Please see the description in DoFnOperator.
-   */
-  private transient volatile long backupWatermarkHold = Long.MIN_VALUE;
+  /** The minimum event time timer timestamp observed during the last bundle. 
*/
+  private transient long minEventTimeTimerTimestampInLastBundle;
+
+  /** The minimum event time timer timestamp observed in the current bundle. */
+  private transient long minEventTimeTimerTimestampInCurrentBundle;
+
+  /** The input watermark before the current bundle started. */
+  private transient long inputWatermarkBeforeBundleStart;
+
+  /** Flag indicating whether the operator has been closed. */
+  private transient boolean closed;
 
   /** Constructor. */
   public ExecutableStageDoFnOperator(
@@ -220,6 +232,11 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> 
extends DoFnOperator<I
           }
         };
 
+    minEventTimeTimerTimestampInCurrentBundle = Long.MAX_VALUE;
+    minEventTimeTimerTimestampInLastBundle = Long.MAX_VALUE;
+    super.setPreBundleCallback(this::preBundleStartCallback);
+    super.setBundleFinishedCallback(this::finishBundleCallback);
+
     // This will call {@code createWrappingDoFnRunner} which needs the above 
dependencies.
     super.open();
   }
@@ -436,8 +453,10 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> 
extends DoFnOperator<I
     return this.<ByteBuffer>getKeyedStateBackend().getCurrentKey();
   }
 
-  private void setTimer(Timer<?> timerElement, TimerInternals.TimerData 
timerData) {
+  void setTimer(Timer<?> timerElement, TimerInternals.TimerData timerData) {
     try {
+      Preconditions.checkState(
+          sdkHarnessRunner.isBundleInProgress(), "Bundle was expected to be in 
progress!!");
       LOG.debug("Setting timer: {} {}", timerElement, timerData);
       // KvToByteBufferKeySelector returns the key encoded, it doesn't care 
about the
       // window, timestamp or pane information.
@@ -455,6 +474,12 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> 
extends DoFnOperator<I
               timerData.getNamespace(), timerData.getTimerId(), 
timerData.getDomain());
         } else {
           timerInternals.setTimer(timerData);
+          if (!timerData.getTimerId().equals(GC_TIMER_ID)) {
+            minEventTimeTimerTimestampInCurrentBundle =
+                Math.min(
+                    minEventTimeTimerTimestampInCurrentBundle,
+                    
adjustTimestampForFlink(timerData.getTimestamp().getMillis()));
+          }
         }
       }
     } catch (Exception e) {
@@ -473,6 +498,18 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> 
extends DoFnOperator<I
   }
 
   @Override
+  public void close() throws Exception {
+    closed = true;
+    // We might still holding back the watermark and Flink does not trigger 
the timer
+    // callback for watermark advancement anymore.
+    processWatermark1(Watermark.MAX_WATERMARK);
+    while (getCurrentOutputWatermark() < 
Watermark.MAX_WATERMARK.getTimestamp()) {
+      invokeFinishBundle();
+    }
+    super.close();
+  }
+
+  @Override
   public void dispose() throws Exception {
     // may be called multiple times when an exception is thrown
     if (stageContext != null) {
@@ -517,7 +554,27 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> 
extends DoFnOperator<I
   }
 
   @Override
-  public void processWatermark(Watermark mark) throws Exception {
+  public long applyInputWatermarkHold(long inputWatermark) {
+    // We must wait until all elements/timers have been processed (happens 
async!) before the
+    // watermark can be progressed. We can't just advance the input watermark 
until at least one
+    // bundle has been completed since the watermark has been received. 
Otherwise we potentially
+    // violate the Beam timer contract which allows for already set timer to 
be modified by
+    // successive elements.
+    //
+    // For example, we set a timer at t1, then finish the bundle (e.g. due to 
the bundle timeout),
+    // then receive an element which updates the timer to fire at t2, and then 
receive a watermark
+    // w1, where w1 > t2 > t1. If we do not hold back the input watermark 
here, w1 would fire the
+    // initial timer at t1, but we want to make sure to fire the updated 
version of the timer at
+    // t2.
+    if (sdkHarnessRunner.isBundleInProgress()) {
+      return inputWatermarkBeforeBundleStart;
+    } else {
+      return inputWatermark;
+    }
+  }
+
+  @Override
+  public long applyOutputWatermarkHold(long currentOutputWatermark, long 
potentialOutputWatermark) {
     // Due to the asynchronous communication with the SDK harness,
     // a bundle might still be in progress and not all items have
     // yet been received from the SDK harness. If we just set this
@@ -544,30 +601,47 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> 
extends DoFnOperator<I
     // every watermark. So we have implemented 2) below.
     //
     if (sdkHarnessRunner.isBundleInProgress()) {
-      if (mark.getTimestamp() >= 
BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
-        invokeFinishBundle();
-        setPushedBackWatermark(Long.MAX_VALUE);
+      if (minEventTimeTimerTimestampInLastBundle < Long.MAX_VALUE) {
+        // We can safely advance the watermark to before the last bundle's 
minimum event timer
+        // but not past the potential output watermark which includes holds to 
the input watermark.
+        return Math.min(minEventTimeTimerTimestampInLastBundle - 1, 
potentialOutputWatermark);
       } else {
-        // It is not safe to advance the output watermark yet, so add a hold 
on the current
-        // output watermark.
-        backupWatermarkHold = Math.max(backupWatermarkHold, 
getPushbackWatermarkHold());
-        setPushedBackWatermark(Math.min(currentOutputWatermark, 
backupWatermarkHold));
-        super.setBundleFinishedCallback(
-            () -> {
-              try {
-                LOG.debug("processing pushed back watermark: {}", mark);
-                // at this point the bundle is finished, allow the watermark 
to pass
-                // we are restoring the previous hold in case it was already 
set for side inputs
-                setPushedBackWatermark(backupWatermarkHold);
-                super.processWatermark(mark);
-              } catch (Exception e) {
-                throw new RuntimeException(
-                    "Failed to process pushed back watermark after finished 
bundle.", e);
-              }
-            });
+        // We don't have any information yet, use the current output watermark 
for now.
+        return currentOutputWatermark;
+      }
+    } else {
+      // No bundle was started when we advanced the input watermark.
+      // Thus, we can safely set a new output watermark.
+      return potentialOutputWatermark;
+    }
+  }
+
+  private void preBundleStartCallback() {
+    inputWatermarkBeforeBundleStart = getEffectiveInputWatermark();
+  }
+
+  @SuppressWarnings("FutureReturnValueIgnored")
+  private void finishBundleCallback() {
+    minEventTimeTimerTimestampInLastBundle = 
minEventTimeTimerTimestampInCurrentBundle;
+    minEventTimeTimerTimestampInCurrentBundle = Long.MAX_VALUE;
+    try {
+      if (!closed
+          && minEventTimeTimerTimestampInLastBundle < Long.MAX_VALUE
+          && minEventTimeTimerTimestampInLastBundle <= 
getEffectiveInputWatermark()) {
+        ProcessingTimeService processingTimeService = 
getProcessingTimeService();
+        // We are scheduling a timer for advancing the watermark, to not delay 
finishing the bundle
+        // and temporarily release the checkpoint lock. Otherwise, we could 
potentially loop when a
+        // timer keeps scheduling a timer for the same timestamp.
+        processingTimeService.registerTimer(
+            processingTimeService.getCurrentProcessingTime(),
+            ts -> processWatermark1(new 
Watermark(getEffectiveInputWatermark())));
+      } else {
+        processWatermark1(new Watermark(getEffectiveInputWatermark()));
       }
+    } catch (Exception e) {
+      throw new RuntimeException(
+          "Failed to progress watermark to " + getEffectiveInputWatermark(), 
e);
     }
-    super.processWatermark(mark);
   }
 
   private static class SdkHarnessDoFnRunner<InputT, OutputT>
@@ -647,7 +721,7 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> 
extends DoFnOperator<I
     @Override
     public void processElement(WindowedValue<InputT> element) {
       try {
-        LOG.debug("Sending value: {}", element);
+        LOG.debug("Processing value: {}", element);
         mainInputReceiver.accept(element);
       } catch (Exception e) {
         throw new RuntimeException("Failed to process element with SDK 
harness.", e);
@@ -769,8 +843,14 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> 
extends DoFnOperator<I
             .collect(Collectors.toList());
 
     KeyedStateBackend<ByteBuffer> stateBackend = getKeyedStateBackend();
+
     StateCleaner stateCleaner =
-        new StateCleaner(userStates, windowCoder, () -> 
stateBackend.getCurrentKey());
+        new StateCleaner(
+            userStates,
+            windowCoder,
+            stateBackend::getCurrentKey,
+            timerInternals::hasPendingEventTimeTimers,
+            cleanupTimer);
 
     return new StatefulDoFnRunner<InputT, OutputT, BoundedWindow>(
         sdkHarnessRunner,
@@ -800,6 +880,8 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> 
extends DoFnOperator<I
         if (!stateCleaner.cleanupQueue.isEmpty()) {
           try (Locker locker = Locker.locked(stateBackendLock)) {
             stateCleaner.cleanupState(keyedStateInternals, 
stateBackend::setCurrentKey);
+          } catch (Exception e) {
+            throw new RuntimeException("Failed to cleanup state.", e);
           }
         }
       }
@@ -834,23 +916,27 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> 
extends DoFnOperator<I
     @Override
     public void setForWindow(InputT input, BoundedWindow window) {
       Preconditions.checkNotNull(input, "Null input passed to CleanupTimer");
-      // make sure this fires after any window.maxTimestamp() timers
-      Instant gcTime = LateDataUtils.garbageCollectionTime(window, 
windowingStrategy).plus(1);
       // needs to match the encoding in prepareStateBackend for state request 
handler
       final ByteBuffer key = FlinkKeyUtils.encodeKey(((KV) input).getKey(), 
keyCoder);
       // Ensure the state backend is not concurrently accessed by the state 
requests
       try (Locker locker = Locker.locked(stateBackendLock)) {
         keyedStateBackend.setCurrentKey(key);
-        timerInternals.setTimer(
-            StateNamespaces.window(windowCoder, window),
-            GC_TIMER_ID,
-            "",
-            gcTime,
-            window.maxTimestamp(),
-            TimeDomain.EVENT_TIME);
+        setCleanupTimer(window);
       }
     }
 
+    void setCleanupTimer(BoundedWindow window) {
+      // make sure this fires after any window.maxTimestamp() timers
+      Instant gcTime = LateDataUtils.garbageCollectionTime(window, 
windowingStrategy).plus(1);
+      timerInternals.setTimer(
+          StateNamespaces.window(windowCoder, window),
+          GC_TIMER_ID,
+          "",
+          gcTime,
+          window.maxTimestamp(),
+          TimeDomain.EVENT_TIME);
+    }
+
     @Override
     public boolean isForWindow(
         String timerId, BoundedWindow window, Instant timestamp, TimeDomain 
timeDomain) {
@@ -866,34 +952,52 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> 
extends DoFnOperator<I
     private final Coder windowCoder;
     private final ArrayDeque<KV<ByteBuffer, BoundedWindow>> cleanupQueue;
     private final Supplier<ByteBuffer> currentKeySupplier;
+    private final ThrowingFunction<Long, Boolean> hasPendingEventTimeTimers;
+    private final CleanupTimer cleanupTimer;
 
     StateCleaner(
-        List<String> userStateNames, Coder windowCoder, Supplier<ByteBuffer> 
currentKeySupplier) {
+        List<String> userStateNames,
+        Coder windowCoder,
+        Supplier<ByteBuffer> currentKeySupplier,
+        ThrowingFunction<Long, Boolean> hasPendingEventTimeTimers,
+        CleanupTimer cleanupTimer) {
       this.userStateNames = userStateNames;
       this.windowCoder = windowCoder;
       this.currentKeySupplier = currentKeySupplier;
+      this.hasPendingEventTimeTimers = hasPendingEventTimeTimers;
+      this.cleanupTimer = cleanupTimer;
       this.cleanupQueue = new ArrayDeque<>();
     }
 
     @Override
     public void clearForWindow(BoundedWindow window) {
+      // Delay cleanup until the end of the bundle to allow stateful 
processing and new timers.
       // Executed in the context of onTimer(..) where the correct key will be 
set
       cleanupQueue.add(KV.of(currentKeySupplier.get(), window));
     }
 
     @SuppressWarnings("ByteBufferBackingArray")
-    void cleanupState(StateInternals stateInternals, Consumer<ByteBuffer> 
keyContextConsumer) {
+    void cleanupState(StateInternals stateInternals, Consumer<ByteBuffer> 
keyContextConsumer)
+        throws Exception {
       while (!cleanupQueue.isEmpty()) {
-        KV<ByteBuffer, BoundedWindow> kv = cleanupQueue.remove();
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("State cleanup for {} {}", 
Arrays.toString(kv.getKey().array()), kv.getValue());
-        }
+        KV<ByteBuffer, BoundedWindow> kv = 
Preconditions.checkNotNull(cleanupQueue.remove());
+        BoundedWindow window = Preconditions.checkNotNull(kv.getValue());
         keyContextConsumer.accept(kv.getKey());
-        for (String userState : userStateNames) {
-          StateNamespace namespace = StateNamespaces.window(windowCoder, 
kv.getValue());
-          StateTag<BagState<Void>> bagStateStateTag = StateTags.bag(userState, 
VoidCoder.of());
-          BagState<?> state = stateInternals.state(namespace, 
bagStateStateTag);
-          state.clear();
+        // Check whether we have pending timers which were set during the 
bundle.
+        if 
(hasPendingEventTimeTimers.apply(window.maxTimestamp().getMillis())) {
+          // Re-add GC timer and let remaining timers fire. Don't cleanup 
state yet.
+          cleanupTimer.setCleanupTimer(window);
+        } else {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("State cleanup for {} {}", 
Arrays.toString(kv.getKey().array()), window);
+          }
+          // No more timers (finally!). Time to clean up.
+          for (String userState : userStateNames) {
+            StateNamespace namespace = StateNamespaces.window(windowCoder, 
window);
+            StateTag<BagState<Void>> bagStateStateTag = 
StateTags.bag(userState, VoidCoder.of());
+            BagState<?> state = stateInternals.state(namespace, 
bagStateStateTag);
+            state.clear();
+          }
         }
       }
     }
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
index b3c905c..c93a04f 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
@@ -53,6 +53,8 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Flink operator for executing splittable {@link DoFn DoFns}. Specifically, 
for executing the
@@ -61,6 +63,8 @@ import org.joda.time.Instant;
 public class SplittableDoFnOperator<InputT, OutputT, RestrictionT>
     extends DoFnOperator<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, 
OutputT> {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(SplittableDoFnOperator.class);
+
   private transient ScheduledExecutorService executorService;
 
   public SplittableDoFnOperator(
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java
index 6d37fbc..c1ef358 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java
@@ -66,7 +66,7 @@ import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.hamcrest.Matchers;
 import org.hamcrest.core.IsIterableContaining;
-import org.joda.time.Duration;
+import org.joda.time.Instant;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -291,8 +291,9 @@ public class FlinkSavepointTest implements Serializable {
     if (isPortablePipeline) {
       key =
           pipeline
-              .apply(Impulse.create())
+              .apply("ImpulseStage", Impulse.create())
               .apply(
+                  "KvMapperStage",
                   MapElements.via(
                       new InferableFunction<byte[], KV<String, Void>>() {
                         @Override
@@ -304,6 +305,7 @@ public class FlinkSavepointTest implements Serializable {
                         }
                       }))
               .apply(
+                  "TimerStage",
                   ParDo.of(
                       new DoFn<KV<String, Void>, KV<String, Long>>() {
                         @StateId("nextInteger")
@@ -311,14 +313,12 @@ public class FlinkSavepointTest implements Serializable {
                             StateSpecs.value();
 
                         @TimerId("timer")
-                        private final TimerSpec timer =
-                            TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+                        private final TimerSpec timer = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
 
                         @ProcessElement
                         public void processElement(
                             ProcessContext context, @TimerId("timer") Timer 
timer) {
-
-                          timer.offset(Duration.ZERO).setRelative();
+                          timer.set(new Instant(0));
                         }
 
                         @OnTimer("timer")
@@ -327,20 +327,20 @@ public class FlinkSavepointTest implements Serializable {
                             @StateId("nextInteger") ValueState<Long> 
nextInteger,
                             @TimerId("timer") Timer timer) {
                           Long current = nextInteger.read();
-                          if (current == null) {
-                            current = -1L;
-                          }
-                          long next = current + 1;
-                          nextInteger.write(next);
-                          context.output(KV.of("key", next));
-                          timer.offset(Duration.millis(100)).setRelative();
+                          current = current != null ? current : 0L;
+                          context.output(KV.of("key", current));
+                          LOG.debug("triggering timer {}", current);
+                          nextInteger.write(current + 1);
+                          // Trigger timer again and continue to hold back the 
watermark
+                          timer.withOutputTimestamp(new 
Instant(0)).set(context.fireTimestamp());
                         }
                       }));
     } else {
       key =
           pipeline
-              .apply(GenerateSequence.from(0))
+              .apply("IdGeneratorStage", GenerateSequence.from(0))
               .apply(
+                  "KvMapperStage",
                   ParDo.of(
                       new DoFn<Long, KV<String, Long>>() {
                         @ProcessElement
@@ -351,6 +351,7 @@ public class FlinkSavepointTest implements Serializable {
     }
     if (restored) {
       return key.apply(
+          "VerificationStage",
           ParDo.of(
               new DoFn<KV<String, Long>, String>() {
 
@@ -372,6 +373,7 @@ public class FlinkSavepointTest implements Serializable {
               }));
     } else {
       return key.apply(
+          "VerificationStage",
           ParDo.of(
               new DoFn<KV<String, Long>, String>() {
 
@@ -386,12 +388,14 @@ public class FlinkSavepointTest implements Serializable {
                     ProcessContext context,
                     @StateId("valueState") ValueState<Integer> intValueState,
                     @StateId("bagState") BagState<Integer> intBagState) {
-                  Long value = 
Objects.requireNonNull(context.element().getValue());
+                  long value = 
Objects.requireNonNull(context.element().getValue());
+                  LOG.debug("value: {} timestamp: {}", value, 
context.timestamp().getMillis());
                   if (value == 0L) {
                     intValueState.write(42);
                     intBagState.add(40);
                     intBagState.add(1);
                     intBagState.add(1);
+                  } else if (value >= 1) {
                     oneShotLatch.countDown();
                   }
                 }
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
index b761c01..1030632 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
@@ -17,13 +17,14 @@
  */
 package org.apache.beam.runners.flink;
 
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
 
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
@@ -33,7 +34,6 @@ import 
org.apache.beam.runners.core.construction.PipelineTranslation;
 import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.PortablePipelineOptions;
 import org.apache.beam.sdk.state.StateSpec;
@@ -66,6 +66,9 @@ import org.slf4j.LoggerFactory;
 /**
  * Tests the state and timer integration of {@link
  * 
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator}.
+ *
+ * <p>The test sets the same timers multiple times per key. This tests that 
only the latest version
+ * of a given timer is run.
  */
 @RunWith(Parameterized.class)
 public class PortableTimersExecutionTest implements Serializable {
@@ -100,12 +103,13 @@ public class PortableTimersExecutionTest implements 
Serializable {
 
   @Test(timeout = 120_000)
   public void testTimerExecution() throws Exception {
-    PipelineOptions options = 
PipelineOptionsFactory.fromArgs("--experiments=beam_fn_api").create();
+    FlinkPipelineOptions options =
+        
PipelineOptionsFactory.fromArgs("--experiments=beam_fn_api").as(FlinkPipelineOptions.class);
     options.setRunner(CrashingRunner.class);
-    options.as(FlinkPipelineOptions.class).setFlinkMaster("[local]");
-    options.as(FlinkPipelineOptions.class).setStreaming(isStreaming);
-    options.as(FlinkPipelineOptions.class).setParallelism(2);
-    
options.as(FlinkPipelineOptions.class).setShutdownSourcesOnFinalWatermark(true);
+    options.setFlinkMaster("[local]");
+    options.setStreaming(isStreaming);
+    options.setParallelism(2);
+    options.setShutdownSourcesOnFinalWatermark(true);
     options
         .as(PortablePipelineOptions.class)
         .setDefaultEnvironmentType(Environments.ENVIRONMENT_EMBEDDED);
@@ -116,6 +120,7 @@ public class PortableTimersExecutionTest implements 
Serializable {
     final int timerOutput = 4093;
     // Enough keys that we exercise interesting code paths
     int numKeys = 50;
+    int numDuplicateTimers = 15;
     List<KV<String, Integer>> input = new ArrayList<>();
     List<KV<String, Integer>> expectedOutput = new ArrayList<>();
 
@@ -123,7 +128,7 @@ public class PortableTimersExecutionTest implements 
Serializable {
       // Each key should have just one final output at GC time
       expectedOutput.add(KV.of(key.toString(), timerOutput));
 
-      for (int i = 0; i < 15; ++i) {
+      for (int i = 0; i < numDuplicateTimers; ++i) {
         // Each input should be output with the offset added
         input.add(KV.of(key.toString(), i));
         expectedOutput.add(KV.of(key.toString(), i + offset));
@@ -167,13 +172,18 @@ public class PortableTimersExecutionTest implements 
Serializable {
           @OnTimer(timerId)
           public void onTimer(
               @StateId(stateId) ValueState<String> state, 
OutputReceiver<KV<String, Integer>> r) {
-            r.output(KV.of(state.read(), timerOutput));
+            String read = Objects.requireNonNull(state.read(), "State must not 
be null");
+            KV<String, Integer> of = KV.of(read, timerOutput);
+            r.output(of);
           }
         };
 
     final Pipeline pipeline = Pipeline.create(options);
     PCollection<KV<String, Integer>> output =
-        
pipeline.apply(Impulse.create()).apply(ParDo.of(inputFn)).apply(ParDo.of(testFn));
+        pipeline
+            .apply("Impulse", Impulse.create())
+            .apply("Input", ParDo.of(inputFn))
+            .apply("Timers", ParDo.of(testFn));
     PAssert.that(output).containsInAnyOrder(expectedOutput);
 
     RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline);
@@ -185,9 +195,8 @@ public class PortableTimersExecutionTest implements 
Serializable {
                 "none",
                 flinkJobExecutor,
                 pipelineProto,
-                options.as(FlinkPipelineOptions.class),
-                new FlinkPipelineRunner(
-                    options.as(FlinkPipelineOptions.class), null, 
Collections.emptyList()));
+                options,
+                new FlinkPipelineRunner(options, null, 
Collections.emptyList()));
 
     jobInvocation.start();
     while (jobInvocation.getState() != Enum.DONE) {
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/ImpulseSourceFunctionTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/ImpulseSourceFunctionTest.java
index 85d3cfa..e45cc1f 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/ImpulseSourceFunctionTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/ImpulseSourceFunctionTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
@@ -75,8 +76,9 @@ public class ImpulseSourceFunctionTest {
     source.run(sourceContext);
     // 2) Should use checkpoint lock
     verify(sourceContext).getCheckpointLock();
-    // 3) Should emit impulse element
+    // 3) Should emit impulse element and the final watermark
     verify(sourceContext).collect(argThat(elementMatcher));
+    verify(sourceContext).emitWatermark(Watermark.MAX_WATERMARK);
     verifyNoMoreInteractions(sourceContext);
     // 4) Should modify checkpoint state
     verify(mockListState).get();
@@ -93,11 +95,13 @@ public class ImpulseSourceFunctionTest {
 
     // 1) Should finish
     source.run(sourceContext);
-    // 2) Should _not_ emit impulse element
-    verifyNoMoreInteractions(sourceContext);
-    // 3) Should keep checkpoint state
+    // 2) Should keep checkpoint state
     verify(mockListState).get();
     verifyNoMoreInteractions(mockListState);
+    // 3) Should always emit the final watermark
+    verify(sourceContext).emitWatermark(Watermark.MAX_WATERMARK);
+    // 4) Should _not_ emit impulse element
+    verifyNoMoreInteractions(sourceContext);
   }
 
   @Test(timeout = 10_000)
@@ -128,6 +132,7 @@ public class ImpulseSourceFunctionTest {
       sourceThread.join();
     }
     verify(sourceContext).collect(argThat(elementMatcher));
+    verify(sourceContext).emitWatermark(Watermark.MAX_WATERMARK);
     verify(mockListState).add(true);
     verify(mockListState).get();
     verifyNoMoreInteractions(mockListState);
@@ -162,7 +167,9 @@ public class ImpulseSourceFunctionTest {
     sourceThread.interrupt();
     sourceThread.join();
 
-    // nothing should have been emitted because the impulse was emitted before 
restore
+    // Should always emit the final watermark
+    verify(sourceContext).emitWatermark(Watermark.MAX_WATERMARK);
+    // no element should have been emitted because the impulse was emitted 
before restore
     verifyNoMoreInteractions(sourceContext);
   }
 
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
index 75fc6f9..c569045 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
@@ -250,6 +250,7 @@ public class DoFnOperatorTest {
   public void testWatermarkContract() throws Exception {
 
     final Instant timerTimestamp = new Instant(1000);
+    final Instant timerOutputTimestamp = timerTimestamp.minus(1);
     final String eventTimeMessage = "Event timer fired: ";
     final String processingTimeMessage = "Processing timer fired";
 
@@ -279,7 +280,7 @@ public class DoFnOperatorTest {
               @TimerId(processingTimerId) Timer processingTimer) {
             eventTimer.set(timerTimestamp);
             eventTimerWithOutputTimestamp
-                .withOutputTimestamp(timerTimestamp.minus(1))
+                .withOutputTimestamp(timerOutputTimestamp)
                 .set(timerTimestamp);
             
processingTimer.offset(Duration.millis(timerTimestamp.getMillis())).setRelative();
           }
@@ -365,7 +366,7 @@ public class DoFnOperatorTest {
     assertThat(stripStreamRecordFromWindowedValue(testHarness.getOutput()), 
emptyIterable());
     assertThat(
         doFnOperator.timerInternals.getMinOutputTimestampMs(),
-        is(timerTimestamp.minus(1).getMillis()));
+        is(timerOutputTimestamp.getMillis()));
 
     // this must fire the event timers
     testHarness.processWatermark(timerTimestamp.getMillis() + 1);
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
index 0a59091..02229dc 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.flink.translation.wrappers.streaming;
 
 import static 
org.apache.beam.runners.core.construction.PTransformTranslation.PAR_DO_TRANSFORM_URN;
+import static 
org.apache.beam.runners.flink.translation.wrappers.streaming.StreamRecordStripper.stripStreamRecordFromWindowedValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.instanceOf;
@@ -25,10 +26,8 @@ import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.iterableWithSize;
 import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
@@ -47,6 +46,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
+import java.util.function.BiConsumer;
 import javax.annotation.Nullable;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
@@ -88,6 +88,7 @@ import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.state.BagState;
 import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -377,10 +378,7 @@ public class ExecutableStageDoFnOperatorTest {
 
     testHarness.close(); // triggers finish bundle
 
-    assertThat(
-        testHarness.getOutput(),
-        contains(
-            new StreamRecord<>(three), new Watermark(watermark), new 
Watermark(Long.MAX_VALUE)));
+    assertThat(stripStreamRecordFromWindowedValue(testHarness.getOutput()), 
contains(three));
 
     assertThat(
         testHarness.getSideOutput(tagsToOutputTags.get(additionalOutput1)),
@@ -392,6 +390,139 @@ public class ExecutableStageDoFnOperatorTest {
   }
 
   @Test
+  public void testWatermarkHandling() throws Exception {
+    TupleTag<Integer> mainOutput = new TupleTag<>("main-output");
+    DoFnOperator.MultiOutputOutputManagerFactory<Integer> outputManagerFactory 
=
+        new DoFnOperator.MultiOutputOutputManagerFactory(mainOutput, 
VoidCoder.of());
+    ExecutableStageDoFnOperator<KV<String, Integer>, Integer> operator =
+        getOperator(
+            mainOutput,
+            Collections.emptyList(),
+            outputManagerFactory,
+            WindowingStrategy.of(FixedWindows.of(Duration.millis(10))),
+            StringUtf8Coder.of(),
+            WindowedValue.getFullCoder(
+                KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()), 
IntervalWindow.getCoder()));
+
+    KeyedOneInputStreamOperatorTestHarness<
+            String, WindowedValue<KV<String, Integer>>, WindowedValue<Integer>>
+        testHarness =
+            new KeyedOneInputStreamOperatorTestHarness<>(
+                operator,
+                val -> val.getValue().getKey(),
+                new CoderTypeInformation<>(StringUtf8Coder.of()));
+    RemoteBundle bundle = Mockito.mock(RemoteBundle.class);
+    when(bundle.getInputReceivers())
+        .thenReturn(
+            ImmutableMap.<String, FnDataReceiver<WindowedValue>>builder()
+                .put("input", Mockito.mock(FnDataReceiver.class))
+                .build());
+    when(bundle.getTimerReceivers())
+        .thenReturn(
+            ImmutableMap.<KV<String, String>, 
FnDataReceiver<WindowedValue>>builder()
+                .put(KV.of("transform", "timer"), 
Mockito.mock(FnDataReceiver.class))
+                .put(KV.of("transform", "timer2"), 
Mockito.mock(FnDataReceiver.class))
+                .put(KV.of("transform", "timer3"), 
Mockito.mock(FnDataReceiver.class))
+                .build());
+    when(stageBundleFactory.getBundle(any(), any(), any(), 
any())).thenReturn(bundle);
+
+    testHarness.open();
+    assertThat(
+        operator.getCurrentOutputWatermark(), 
is(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()));
+
+    // No bundle has been started, watermark can be freely advanced
+    testHarness.processWatermark(0);
+    assertThat(operator.getCurrentOutputWatermark(), is(0L));
+
+    // Trigger a new bundle
+    IntervalWindow intervalWindow = new IntervalWindow(new Instant(0), new 
Instant(9));
+    WindowedValue<KV<String, Integer>> windowedValue =
+        WindowedValue.of(KV.of("one", 1), Instant.now(), intervalWindow, 
PaneInfo.NO_FIRING);
+    testHarness.processElement(new StreamRecord<>(windowedValue));
+
+    // The output watermark should be held back during the bundle
+    testHarness.processWatermark(1);
+    assertThat(operator.getEffectiveInputWatermark(), is(1L));
+    assertThat(operator.getCurrentOutputWatermark(), is(0L));
+
+    // After the bundle has been finished, the watermark should be advanced
+    operator.invokeFinishBundle();
+    assertThat(operator.getCurrentOutputWatermark(), is(1L));
+
+    // Bundle finished, watermark can be freely advanced
+    testHarness.processWatermark(2);
+    assertThat(operator.getEffectiveInputWatermark(), is(2L));
+    assertThat(operator.getCurrentOutputWatermark(), is(2L));
+
+    // Trigger a new bundle
+    testHarness.processElement(new StreamRecord<>(windowedValue));
+    assertThat(testHarness.numEventTimeTimers(), is(1)); // cleanup timer
+
+    // Set at timer
+    Instant timerTarget = new Instant(5);
+    Instant timerTarget2 = new Instant(6);
+    operator.getLockToAcquireForStateAccessDuringBundles().lock();
+
+    BiConsumer<String, Instant> timerConsumer =
+        (timerId, timestamp) ->
+            operator.setTimer(
+                Timer.of(
+                    windowedValue.getValue().getKey(),
+                    "",
+                    windowedValue.getWindows(),
+                    timestamp,
+                    timestamp,
+                    PaneInfo.NO_FIRING),
+                TimerInternals.TimerData.of(
+                    TimerReceiverFactory.encodeToTimerDataTimerId("transform", 
timerId),
+                    "",
+                    StateNamespaces.window(IntervalWindow.getCoder(), 
intervalWindow),
+                    timestamp,
+                    timestamp,
+                    TimeDomain.EVENT_TIME));
+
+    timerConsumer.accept("timer", timerTarget);
+    timerConsumer.accept("timer2", timerTarget2);
+    assertThat(testHarness.numEventTimeTimers(), is(3));
+
+    // Advance input watermark past the timer
+    // Check the output watermark is held back
+    long targetWatermark = timerTarget.getMillis() + 100;
+    testHarness.processWatermark(targetWatermark);
+    // Do not yet advance the output watermark because we are still processing 
a bundle
+    assertThat(testHarness.numEventTimeTimers(), is(3));
+    assertThat(operator.getCurrentOutputWatermark(), is(2L));
+
+    // Check that the timers are fired but the output watermark is advanced no 
further than
+    // the minimum timer timestamp of the previous bundle because we are still 
processing a
+    // bundle which might contain more timers.
+    // Timers can create loops if they keep rescheduling themselves when firing
+    // Thus, we advance the watermark asynchronously to allow for 
checkpointing to run
+    operator.invokeFinishBundle();
+    assertThat(testHarness.numEventTimeTimers(), is(3));
+    testHarness.setProcessingTime(testHarness.getProcessingTime() + 1);
+    assertThat(testHarness.numEventTimeTimers(), is(0));
+    assertThat(operator.getCurrentOutputWatermark(), is(5L));
+
+    // Output watermark is advanced synchronously when the bundle finishes,
+    // no more timers are scheduled
+    operator.invokeFinishBundle();
+    assertThat(operator.getCurrentOutputWatermark(), is(targetWatermark));
+    assertThat(testHarness.numEventTimeTimers(), is(0));
+
+    // Watermark is advanced in a blocking fashion on close, not via a timers
+    // Create a bundle with a pending timer to simulate that
+    testHarness.processElement(new StreamRecord<>(windowedValue));
+    timerConsumer.accept("timer3", new Instant(targetWatermark));
+    assertThat(testHarness.numEventTimeTimers(), is(1));
+
+    // This should be blocking until the watermark reaches Long.MAX_VALUE.
+    testHarness.close();
+    assertThat(testHarness.numEventTimeTimers(), is(0));
+    assertThat(operator.getCurrentOutputWatermark(), is(Long.MAX_VALUE));
+  }
+
+  @Test
   public void testStageBundleClosed() throws Exception {
     TupleTag<Integer> mainOutput = new TupleTag<>("main-output");
     DoFnOperator.MultiOutputOutputManagerFactory<Integer> outputManagerFactory 
=
@@ -492,7 +623,7 @@ public class ExecutableStageDoFnOperatorTest {
   }
 
   @Test
-  public void testEnsureStateCleanupWithKeyedInputStateCleaner() {
+  public void testEnsureStateCleanupWithKeyedInputStateCleaner() throws 
Exception {
     GlobalWindow.Coder windowCoder = GlobalWindow.Coder.INSTANCE;
     InMemoryStateInternals<String> stateInternals = 
InMemoryStateInternals.forKey("key");
     List<String> userStateNames = ImmutableList.of("state1", "state2");
@@ -514,13 +645,13 @@ public class ExecutableStageDoFnOperatorTest {
     // Test that state is cleaned up correctly
     ExecutableStageDoFnOperator.StateCleaner stateCleaner =
         new ExecutableStageDoFnOperator.StateCleaner(
-            userStateNames, windowCoder, () -> key.getValue());
+            userStateNames, windowCoder, key::getValue, ts -> false, null);
     for (BagState<String> bagState : bagStates) {
       assertThat(Iterables.size(bagState.read()), is(1));
     }
 
     stateCleaner.clearForWindow(GlobalWindow.INSTANCE);
-    stateCleaner.cleanupState(stateInternals, (k) -> key.setValue(k));
+    stateCleaner.cleanupState(stateInternals, key::setValue);
 
     for (BagState<String> bagState : bagStates) {
       assertThat(Iterables.size(bagState.read()), is(0));
@@ -530,6 +661,10 @@ public class ExecutableStageDoFnOperatorTest {
   @Test
   public void testEnsureDeferredStateCleanupTimerFiring() throws Exception {
     testEnsureDeferredStateCleanupTimerFiring(false);
+  }
+
+  @Test
+  public void testEnsureDeferredStateCleanupTimerFiringWithCheckpointing() 
throws Exception {
     testEnsureDeferredStateCleanupTimerFiring(true);
   }
 
@@ -561,7 +696,7 @@ public class ExecutableStageDoFnOperatorTest {
     AtomicBoolean timerInputReceived = new AtomicBoolean();
     IntervalWindow window = new IntervalWindow(new Instant(0), new 
Instant(1000));
     IntervalWindow.IntervalWindowCoder windowCoder = 
IntervalWindow.IntervalWindowCoder.of();
-    WindowedValue<KV<String, Integer>> one =
+    WindowedValue<KV<String, Integer>> windowedValue =
         WindowedValue.of(
             KV.of("one", 1), window.maxTimestamp(), ImmutableList.of(window), 
PaneInfo.NO_FIRING);
 
@@ -579,36 +714,37 @@ public class ExecutableStageDoFnOperatorTest {
     when(bundle.getTimerReceivers()).thenReturn(ImmutableMap.of(timerInputKey, 
timerReceiver));
 
     KeyedOneInputStreamOperatorTestHarness<
-            String, WindowedValue<KV<String, Integer>>, 
WindowedValue<KV<String, Integer>>>
+            ByteBuffer, WindowedValue<KV<String, Integer>>, 
WindowedValue<Integer>>
         testHarness =
             new KeyedOneInputStreamOperatorTestHarness(
-                operator, val -> val, new CoderTypeInformation<>(keyCoder));
+                operator,
+                operator.keySelector,
+                new 
CoderTypeInformation<>(FlinkKeyUtils.ByteBufferCoder.of()));
 
     testHarness.open();
 
-    Lock stateBackendLock = (Lock) Whitebox.getInternalState(operator, 
"stateBackendLock");
+    Lock stateBackendLock = Whitebox.getInternalState(operator, 
"stateBackendLock");
     stateBackendLock.lock();
 
     KeyedStateBackend<ByteBuffer> keyedStateBackend = 
operator.getKeyedStateBackend();
-    ByteBuffer key = FlinkKeyUtils.encodeKey(one.getValue().getKey(), 
keyCoder);
+    ByteBuffer key = 
FlinkKeyUtils.encodeKey(windowedValue.getValue().getKey(), keyCoder);
     keyedStateBackend.setCurrentKey(key);
 
     DoFnOperator.FlinkTimerInternals timerInternals =
-        (DoFnOperator.FlinkTimerInternals) Whitebox.getInternalState(operator, 
"timerInternals");
+        Whitebox.getInternalState(operator, "timerInternals");
 
     Object doFnRunner = Whitebox.getInternalState(operator, "doFnRunner");
     Object delegate = Whitebox.getInternalState(doFnRunner, "delegate");
     Object stateCleaner = Whitebox.getInternalState(delegate, "stateCleaner");
-    Collection<?> cleanupTimers =
-        (Collection) Whitebox.getInternalState(stateCleaner, "cleanupQueue");
+    Collection<?> cleanupQueue = Whitebox.getInternalState(stateCleaner, 
"cleanupQueue");
 
     // create some state which can be cleaned up
     assertThat(testHarness.numKeyedStateEntries(), is(0));
     StateNamespace stateNamespace = StateNamespaces.window(windowCoder, 
window);
-    BagState<String> state =
+    BagState<ByteString> state = // State from the SDK Harness is stored as 
ByteStrings
         operator.keyedStateInternals.state(
-            stateNamespace, StateTags.bag(stateId, StringUtf8Coder.of()));
-    state.add("testUserState");
+            stateNamespace, StateTags.bag(stateId, ByteStringCoder.of()));
+    state.add(ByteString.copyFrom("userstate".getBytes(Charsets.UTF_8)));
     assertThat(testHarness.numKeyedStateEntries(), is(1));
 
     // user timer that fires after the end of the window and after state 
cleanup
@@ -617,51 +753,84 @@ public class ExecutableStageDoFnOperatorTest {
             TimerReceiverFactory.encodeToTimerDataTimerId(
                 timerInputKey.getKey(), timerInputKey.getValue()),
             stateNamespace,
-            window.maxTimestamp().plus(1),
+            window.maxTimestamp(),
             TimeDomain.EVENT_TIME);
     timerInternals.setTimer(userTimer);
 
     // start of bundle
-    testHarness.processElement(new StreamRecord<>(one));
-    verify(receiver).accept(one);
+    testHarness.processElement(new StreamRecord<>(windowedValue));
+    verify(receiver).accept(windowedValue);
 
-    // move watermark past cleanup and user timer while bundle in progress
-    operator.processWatermark(new 
Watermark(window.maxTimestamp().plus(2).getMillis()));
+    // move watermark past user timer while bundle is in progress
+    testHarness.processWatermark(new 
Watermark(window.maxTimestamp().plus(1).getMillis()));
 
-    // due to watermark hold the timers won't fire at this point
-    assertFalse("Watermark must be held back until bundle is complete.", 
timerInputReceived.get());
-    assertThat(cleanupTimers, hasSize(0));
+    // Output watermark is held back and timers do not yet fire (they can 
still be changed!)
+    assertThat(timerInputReceived.get(), is(false));
+    assertThat(
+        operator.getCurrentOutputWatermark(), 
is(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()));
+    // The timer fires on bundle finish
+    operator.invokeFinishBundle();
+    assertThat(timerInputReceived.getAndSet(false), is(true));
 
-    if (withCheckpointing) {
-      // Upon checkpointing, the bundle is finished and the watermark advances;
-      // timers can fire. Note: The bundle is ensured to be finished.
-      testHarness.snapshot(0, 0);
+    // Move watermark past the cleanup timer
+    testHarness.processWatermark(new 
Watermark(window.maxTimestamp().plus(2).getMillis()));
+    operator.invokeFinishBundle();
 
-      // The user timer was scheduled to fire after cleanup, but executes first
-      assertTrue("Timer should have been triggered.", 
timerInputReceived.get());
-      // Cleanup will be executed after the bundle is complete
-      assertThat(cleanupTimers, hasSize(0));
-      verifyNoMoreInteractions(receiver);
-    } else {
-      // Upon finishing a bundle, the watermark advances; timers can fire.
-      // Note that this will finish the current bundle, but will also start a 
new one
-      // when timers fire as part of advancing the watermark
-      operator.invokeFinishBundle();
+    // Cleanup timer has fired and cleanup queue is prepared for bundle finish
+    assertThat(testHarness.numEventTimeTimers(), is(0));
+    assertThat(testHarness.numKeyedStateEntries(), is(1));
+    assertThat(cleanupQueue, hasSize(1));
 
-      // The user timer was scheduled to fire after cleanup, but executes first
-      assertTrue("Timer should have been triggered.", 
timerInputReceived.get());
-      // Cleanup will be executed after the bundle is complete
-      assertThat(cleanupTimers, hasSize(1));
-      verifyNoMoreInteractions(receiver);
+    // Cleanup timer are rescheduled if a new timer is created during the 
bundle
+    TimerInternals.TimerData userTimer2 =
+        TimerInternals.TimerData.of(
+            TimerReceiverFactory.encodeToTimerDataTimerId(
+                timerInputKey.getKey(), timerInputKey.getValue()),
+            stateNamespace,
+            window.maxTimestamp(),
+            TimeDomain.EVENT_TIME);
+    operator.setTimer(
+        Timer.of(
+            windowedValue.getValue().getKey(),
+            "",
+            windowedValue.getWindows(),
+            window.maxTimestamp(),
+            window.maxTimestamp(),
+            PaneInfo.NO_FIRING),
+        userTimer2);
+    assertThat(testHarness.numEventTimeTimers(), is(1));
 
-      // Finish bundle which has been started by finishing the bundle
+    if (withCheckpointing) {
+      // Upon checkpointing, the bundle will be finished.
+      testHarness.snapshot(0, 0);
+    } else {
       operator.invokeFinishBundle();
-      assertThat(cleanupTimers, hasSize(0));
     }
 
+    // Cleanup queue has been processed and cleanup timer has been re-added 
due to pending timers
+    // for the window.
+    assertThat(cleanupQueue, hasSize(0));
+    verifyNoMoreInteractions(receiver);
+    assertThat(testHarness.numKeyedStateEntries(), is(2));
+    assertThat(testHarness.numEventTimeTimers(), is(2));
+
+    // No timer has been fired but bundle should be ended
+    assertThat(timerInputReceived.get(), is(false));
+    assertThat(Whitebox.getInternalState(operator, "bundleStarted"), 
is(false));
+
+    // Allow user timer and cleanup timer to fire by triggering watermark 
advancement
+    testHarness.setProcessingTime(testHarness.getProcessingTime() + 1);
+    assertThat(timerInputReceived.getAndSet(false), is(true));
+    assertThat(cleanupQueue, hasSize(1));
+
+    // Cleanup will be executed after the bundle is complete because there are 
no more pending
+    // timers for the window
+    operator.invokeFinishBundle();
+    assertThat(cleanupQueue, hasSize(0));
     assertThat(testHarness.numKeyedStateEntries(), is(0));
 
     testHarness.close();
+    verifyNoMoreInteractions(receiver);
   }
 
   @Test
@@ -851,7 +1020,8 @@ public class ExecutableStageDoFnOperatorTest {
    * #runtimeContext}. The context factory is mocked to return {@link 
#stageContext} every time. The
    * behavior of the stage context itself is unchanged.
    */
-  private ExecutableStageDoFnOperator<Integer, Integer> getOperator(
+  @SuppressWarnings("rawtypes")
+  private ExecutableStageDoFnOperator getOperator(
       TupleTag<Integer> mainOutput,
       List<TupleTag<?>> additionalOutputs,
       DoFnOperator.MultiOutputOutputManagerFactory<Integer> 
outputManagerFactory) {
@@ -864,7 +1034,8 @@ public class ExecutableStageDoFnOperatorTest {
         WindowedValue.getFullCoder(StringUtf8Coder.of(), 
GlobalWindow.Coder.INSTANCE));
   }
 
-  private ExecutableStageDoFnOperator<Integer, Integer> getOperator(
+  @SuppressWarnings("rawtypes")
+  private ExecutableStageDoFnOperator getOperator(
       TupleTag<Integer> mainOutput,
       List<TupleTag<?>> additionalOutputs,
       DoFnOperator.MultiOutputOutputManagerFactory<Integer> 
outputManagerFactory,
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java
index fdb84c4..d552f5e 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java
@@ -152,7 +152,7 @@ public class WindowDoFnOperatorTest {
     assertThat(Iterables.size(timerInternals.pendingTimersById.keys()), is(1));
 
     assertThat(testHarness.numKeyedStateEntries(), is(6));
-    assertThat(windowDoFnOperator.currentOutputWatermark, is(1L));
+    assertThat(windowDoFnOperator.getCurrentOutputWatermark(), is(1L));
     assertThat(timerInternals.getMinOutputTimestampMs(), is(Long.MAX_VALUE));
 
     // close window
@@ -162,7 +162,7 @@ public class WindowDoFnOperatorTest {
     assertThat(Iterables.size(timerInternals.pendingTimersById.keys()), is(0));
 
     assertThat(testHarness.numKeyedStateEntries(), is(3));
-    assertThat(windowDoFnOperator.currentOutputWatermark, is(100L));
+    assertThat(windowDoFnOperator.getCurrentOutputWatermark(), is(100L));
     assertThat(timerInternals.getMinOutputTimestampMs(), is(Long.MAX_VALUE));
 
     testHarness.processWatermark(200L);

Reply via email to