[ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=166016&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166016
 ]

ASF GitHub Bot logged work on BEAM-4681:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 14/Nov/18 18:52
            Start Date: 14/Nov/18 18:52
    Worklog Time Spent: 10m 
      Work Description: tweise closed pull request #6981: [BEAM-4681] Add 
support for portable timers in Flink streaming mode 
URL: https://github.com/apache/beam/pull/6981
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index bd83ad1ea47..ff1d38b859f 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -243,6 +243,7 @@ class BeamModulePlugin implements Plugin<Project> {
       excludeCategories 'org.apache.beam.sdk.testing.UsesMapState'
       excludeCategories 'org.apache.beam.sdk.testing.UsesSetState'
       excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
+      // TODO Enable test once timer-support for batch is merged
       excludeCategories 'org.apache.beam.sdk.testing.UsesTimersInParDo'
       //SplitableDoFnTests
       excludeCategories 
'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo'
@@ -1471,6 +1472,9 @@ artifactId=${project.name}
         
"--runner=org.apache.beam.runners.reference.testing.TestPortableRunner",
         "--jobServerDriver=${config.jobServerDriver}",
         "--environmentCacheMillis=10000",
+        // TODO Create two tasks to run for both batch and streaming:
+        // https://issues.apache.org/jira/browse/BEAM-6009
+        // "--streaming"
       ]
       if (config.jobServerConfig) {
         
beamTestPipelineOptions.add("--jobServerConfig=${config.jobServerConfig}")
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
index bf25dc65013..a8adc97edcf 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.flink;
 
+import static 
org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils.getWindowingStrategy;
 import static 
org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils.instantiateCoder;
 
 import com.fasterxml.jackson.databind.JsonNode;
@@ -562,8 +563,7 @@ private void translateStreamingImpulse(
 
     Coder keyCoder = null;
     KeySelector<WindowedValue<InputT>, ?> keySelector = null;
-    final boolean stateful = stagePayload.getUserStatesCount() > 0;
-    if (stateful) {
+    if (stagePayload.getUserStatesCount() > 0 || stagePayload.getTimersCount() 
> 0) {
       // Stateful stages are only allowed of KV input
       Coder valueCoder =
           ((WindowedValue.FullWindowedValueCoder) 
windowedInputCoder).getValueCoder();
@@ -601,6 +601,7 @@ private void translateStreamingImpulse(
             context.getJobInfo(),
             FlinkExecutableStageContext.factory(context.getPipelineOptions()),
             collectionIdToTupleTag,
+            getWindowingStrategy(inputPCollectionId, components),
             keyCoder,
             keySelector);
 
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/FlinkPipelineTranslatorUtils.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/FlinkPipelineTranslatorUtils.java
index 1710a27da7c..2ea3b9323d3 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/FlinkPipelineTranslatorUtils.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/FlinkPipelineTranslatorUtils.java
@@ -22,10 +22,14 @@
 import com.google.common.collect.Sets;
 import java.io.IOException;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
+import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
 import org.apache.beam.runners.core.construction.graph.PipelineNode;
 import org.apache.beam.runners.fnexecution.wire.WireCoders;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import 
org.apache.beam.vendor.grpc.v1_13_1.com.google.protobuf.InvalidProtocolBufferException;
 
 /** Utilities for pipeline translation. */
 public final class FlinkPipelineTranslatorUtils {
@@ -54,4 +58,23 @@ private FlinkPipelineTranslatorUtils() {}
       throw new RuntimeException("Could not instantiate Coder", e);
     }
   }
+
+  public static WindowingStrategy getWindowingStrategy(
+      String pCollectionId, RunnerApi.Components components) {
+    RunnerApi.WindowingStrategy windowingStrategyProto =
+        components.getWindowingStrategiesOrThrow(
+            
components.getPcollectionsOrThrow(pCollectionId).getWindowingStrategyId());
+
+    final WindowingStrategy<?, ?> windowingStrategy;
+    try {
+      return WindowingStrategyTranslation.fromProto(
+          windowingStrategyProto, 
RehydratedComponents.forComponents(components));
+    } catch (InvalidProtocolBufferException e) {
+      throw new IllegalStateException(
+          String.format(
+              "Unable to hydrate windowing strategy %s for %s.",
+              windowingStrategyProto, pCollectionId),
+          e);
+    }
+  }
 }
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 3a92808eddd..57650ab0991 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -153,9 +153,9 @@
 
   private final Map<TupleTag<?>, Coder<?>> outputCoders;
 
-  private final Coder<?> keyCoder;
+  protected final Coder<?> keyCoder;
 
-  private final KeySelector<WindowedValue<InputT>, ?> keySelector;
+  final KeySelector<WindowedValue<InputT>, ?> keySelector;
 
   private final TimerInternals.TimerDataCoder timerCoder;
 
@@ -289,7 +289,6 @@ public void initializeState(StateInitializationContext 
context) throws Exception
     
setCurrentSideInputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
     setCurrentOutputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
 
-    FlinkPipelineOptions options = 
serializedOptions.get().as(FlinkPipelineOptions.class);
     sideInputReader = NullSideInputReader.of(sideInputs);
 
     // maybe init by initializeState
@@ -333,16 +332,18 @@ public void initializeState(StateInitializationContext 
context) throws Exception
 
       timerInternals = new FlinkTimerInternals();
     }
+  }
 
+  @Override
+  public void open() throws Exception {
     // WindowDoFnOperator need use state and timer to get DoFn.
     // So must wait StateInternals and TimerInternals ready.
+    // This will be called after initializeState()
     this.doFn = getDoFn();
     doFnInvoker = DoFnInvokers.invokerFor(doFn);
-
     doFnInvoker.invokeSetup();
 
-    StepContext stepContext = new FlinkStepContext();
-
+    FlinkPipelineOptions options = 
serializedOptions.get().as(FlinkPipelineOptions.class);
     doFnRunner =
         DoFnRunners.simpleRunner(
             options,
@@ -351,7 +352,7 @@ public void initializeState(StateInitializationContext 
context) throws Exception
             outputManager,
             mainOutputTag,
             additionalOutputTags,
-            stepContext,
+            new FlinkStepContext(),
             inputCoder,
             outputCoders,
             windowingStrategy);
@@ -442,22 +443,21 @@ public void close() throws Exception {
     }
   }
 
-  private long getPushbackWatermarkHold() {
+  protected long getPushbackWatermarkHold() {
     return pushedBackWatermark;
   }
 
+  protected void setPushedBackWatermark(long watermark) {
+    pushedBackWatermark = watermark;
+  }
+
   @Override
-  public final void processElement(StreamRecord<WindowedValue<InputT>> 
streamRecord)
-      throws Exception {
+  public final void processElement(StreamRecord<WindowedValue<InputT>> 
streamRecord) {
     checkInvokeStartBundle();
     doFnRunner.processElement(streamRecord.getValue());
     checkInvokeFinishBundleByCount();
   }
 
-  private void setPushedBackWatermark(long watermark) {
-    pushedBackWatermark = watermark;
-  }
-
   @Override
   public final void processElement1(StreamRecord<WindowedValue<InputT>> 
streamRecord)
       throws Exception {
@@ -669,7 +669,7 @@ private void checkInvokeFinishBundleByTime() {
     }
   }
 
-  private void invokeFinishBundle() {
+  protected void invokeFinishBundle() {
     if (bundleStarted) {
       pushbackDoFnRunner.finishBundle();
       bundleStarted = false;
@@ -916,7 +916,7 @@ public TimerInternals timerInternals() {
     }
   }
 
-  private class FlinkTimerInternals implements TimerInternals {
+  class FlinkTimerInternals implements TimerInternals {
 
     @Override
     public void setTimer(
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index eb6461113ef..984817fdb66 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -18,17 +18,20 @@
 package org.apache.beam.runners.flink.translation.wrappers.streaming;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterables;
+import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.EnumMap;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey.TypeCase;
@@ -38,12 +41,16 @@
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateNamespaces;
 import org.apache.beam.runners.core.StateTags;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.construction.Timer;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.TimerReference;
 import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContext;
 import 
org.apache.beam.runners.flink.translation.functions.FlinkStreamingSideInputHandlerFactory;
 import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
 import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
 import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors;
+import 
org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors.TimerSpec;
 import org.apache.beam.runners.fnexecution.control.RemoteBundle;
 import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
 import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
@@ -57,6 +64,7 @@
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -64,6 +72,8 @@
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
@@ -75,8 +85,6 @@
  * received elements to the SDK harness and emits the received back elements 
to the downstream
  * operators. It also takes care of handling side inputs and state.
  *
- * <p>TODO Integrate support for timers
- *
  * <p>TODO Integrate support for progress updates and metrics
  */
 public class ExecutableStageDoFnOperator<InputT, OutputT> extends 
DoFnOperator<InputT, OutputT> {
@@ -88,14 +96,14 @@
   private final FlinkExecutableStageContext.Factory contextFactory;
   private final Map<String, TupleTag<?>> outputMap;
   private final Map<RunnerApi.ExecutableStagePayload.SideInputId, 
PCollectionView<?>> sideInputIds;
+  private final boolean usesTimers;
 
   private transient FlinkExecutableStageContext stageContext;
   private transient StateRequestHandler stateRequestHandler;
   private transient BundleProgressHandler progressHandler;
   private transient StageBundleFactory stageBundleFactory;
-  private transient LinkedBlockingQueue<KV<String, OutputT>> outputQueue;
   private transient ExecutableStage executableStage;
-  private transient RemoteBundle remoteBundle;
+  private transient SdkHarnessDoFnRunner<InputT, OutputT> sdkHarnessRunner;
 
   public ExecutableStageDoFnOperator(
       String stepName,
@@ -113,6 +121,7 @@ public ExecutableStageDoFnOperator(
       JobInfo jobInfo,
       FlinkExecutableStageContext.Factory contextFactory,
       Map<String, TupleTag<?>> outputMap,
+      WindowingStrategy windowingStrategy,
       Coder keyCoder,
       KeySelector<WindowedValue<InputT>, ?> keySelector) {
     super(
@@ -124,7 +133,7 @@ public ExecutableStageDoFnOperator(
         mainOutputTag,
         additionalOutputTags,
         outputManagerFactory,
-        WindowingStrategy.globalDefault() /* unused */,
+        windowingStrategy,
         sideInputTagMapping,
         sideInputs,
         options,
@@ -135,12 +144,11 @@ public ExecutableStageDoFnOperator(
     this.contextFactory = contextFactory;
     this.outputMap = outputMap;
     this.sideInputIds = sideInputIds;
+    this.usesTimers = payload.getTimersCount() > 0;
   }
 
   @Override
   public void open() throws Exception {
-    super.open();
-
     executableStage = ExecutableStage.fromPayload(payload);
     // TODO: Wire this into the distributed cache and make it pluggable.
     // TODO: Do we really want this layer of indirection when accessing the 
stage bundle factory?
@@ -152,7 +160,9 @@ public void open() throws Exception {
     stageBundleFactory = stageContext.getStageBundleFactory(executableStage);
     stateRequestHandler = getStateRequestHandler(executableStage);
     progressHandler = BundleProgressHandler.ignored();
-    outputQueue = new LinkedBlockingQueue<>();
+
+    // This will call {@code createWrappingDoFnRunner} which needs the above 
dependencies.
+    super.open();
   }
 
   private StateRequestHandler getStateRequestHandler(ExecutableStage 
executableStage) {
@@ -270,8 +280,38 @@ public void setKeyContextElement1(StreamRecord record) 
throws Exception {
 
   @Override
   public void setCurrentKey(Object key) {
-    throw new UnsupportedOperationException(
-        "Current key for state backend can only be set by state requests from 
SDK workers.");
+    // We don't need to set anything, the key is set manually on the state 
backend
+    // This will be called by HeapInternalTimerService before a timer is fired
+    if (!usesTimers) {
+      throw new UnsupportedOperationException(
+          "Current key for state backend can only be set by state requests 
from SDK workers or when processing timers.");
+    }
+  }
+
+  @Override
+  public Object getCurrentKey() {
+    // This is the key retrieved by HeapInternalTimerService when setting a 
Flink timer
+    return sdkHarnessRunner.getTimerKeyForRegistration();
+  }
+
+  @Override
+  public void fireTimer(InternalTimer<?, TimerInternals.TimerData> timer) {
+    // We need to decode the key
+    final ByteBuffer encodedKey = (ByteBuffer) timer.getKey();
+    @SuppressWarnings("ByteBufferBackingArray")
+    byte[] bytes = encodedKey.array();
+    ByteArrayInputStream byteStream = new ByteArrayInputStream(bytes);
+    final Object decodedKey;
+    try {
+      decodedKey = keyCoder.decode(byteStream);
+    } catch (IOException e) {
+      throw new RuntimeException(
+          String.format(
+              Locale.ENGLISH, "Failed to decode encoded key: %s", 
Arrays.toString(bytes)));
+    }
+    // Prepare the SdkHarnessRunner with the key for the timer
+    sdkHarnessRunner.setTimerKeyForFire(decodedKey);
+    super.fireTimer(timer);
   }
 
   @Override
@@ -304,16 +344,129 @@ protected void 
addSideInputValue(StreamRecord<RawUnionValue> streamRecord) {
   @Override
   protected DoFnRunner<InputT, OutputT> createWrappingDoFnRunner(
       DoFnRunner<InputT, OutputT> wrappedRunner) {
-    return new SdkHarnessDoFnRunner();
+    sdkHarnessRunner =
+        new SdkHarnessDoFnRunner<>(
+            executableStage.getInputPCollection().getId(),
+            stageBundleFactory,
+            stateRequestHandler,
+            progressHandler,
+            outputManager,
+            outputMap,
+            executableStage.getTimers(),
+            (Coder<BoundedWindow>) 
windowingStrategy.getWindowFn().windowCoder(),
+            keySelector,
+            timerInternals);
+    return sdkHarnessRunner;
+  }
+
+  @Override
+  public void processWatermark(Watermark mark) throws Exception {
+    // Due to the asynchronous communication with the SDK harness,
+    // a bundle might still be in progress and not all items have
+    // yet been received from the SDK harness. If we just set this
+    // watermark as the new output watermark, we could violate the
+    // order of the records, i.e. pending items in the SDK harness
+    // could become "late" although they were "on time".
+    //
+    // We can solve this problem using one of the following options:
+    //
+    // 1) Finish the current bundle and emit this watermark as the
+    //    new output watermark. Finishing the bundle ensures that
+    //    all the items have been processed by the SDK harness and
+    //    received by the outputQueue (see below), where they will
+    //    have been emitted to the output stream.
+    //
+    // 2) Put a hold on the output watermark for as long as the current
+    //    bundle has not been finished. We have to remember to manually
+    //    finish the bundle in case we receive the final watermark.
+    //    To avoid latency, we should process this watermark again as
+    //    soon as the current bundle is finished.
+    //
+    // Approach 1) is the easiest and gives better latency, yet 2)
+    // gives better throughput due to the bundle not getting cut on
+    // every watermark. So we have implemented 2) below.
+    //
+    if (sdkHarnessRunner.isBundleInProgress()) {
+      if (mark.getTimestamp() >= 
BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+        invokeFinishBundle();
+        setPushedBackWatermark(Long.MAX_VALUE);
+      } else {
+        // It is not safe to advance the output watermark yet, so add a hold 
on the current
+        // output watermark.
+        setPushedBackWatermark(Math.min(currentOutputWatermark, 
getPushbackWatermarkHold()));
+        sdkHarnessRunner.setBundleFinishedCallback(
+            () -> {
+              try {
+                processWatermark(mark);
+              } catch (Exception e) {
+                throw new RuntimeException(
+                    "Failed to process pushed back watermark after finished 
bundle.", e);
+              }
+            });
+      }
+    }
+    super.processWatermark(mark);
   }
 
-  private class SdkHarnessDoFnRunner implements DoFnRunner<InputT, OutputT> {
+  private static class SdkHarnessDoFnRunner<InputT, OutputT>
+      implements DoFnRunner<InputT, OutputT> {
+
+    private final String mainInput;
+    private final LinkedBlockingQueue<KV<String, OutputT>> outputQueue;
+    private final StageBundleFactory stageBundleFactory;
+    private final StateRequestHandler stateRequestHandler;
+    private final BundleProgressHandler progressHandler;
+    private final BufferedOutputManager<OutputT> outputManager;
+    private final Map<String, TupleTag<?>> outputMap;
+    /** Timer Output Pcollection id => TimerSpec. */
+    private final Map<String, TimerSpec> timerOutputIdToSpecMap;
+
+    private final Coder<BoundedWindow> windowCoder;
+    private final KeySelector<WindowedValue<InputT>, ?> keySelector;
+    private final TimerInternals timerInternals;
+
+    private RemoteBundle remoteBundle;
+    private FnDataReceiver<WindowedValue<?>> mainInputReceiver;
+    private Runnable bundleFinishedCallback;
+    // Timer key set before calling Flink's internal timer service. Used to
+    // avoid synchronizing on the state backend.
+    private Object keyForTimerToBeSet;
+    // Set before calling onTimer
+    private Object keyForTimerToBeFired;
+
+    public SdkHarnessDoFnRunner(
+        String mainInput,
+        StageBundleFactory stageBundleFactory,
+        StateRequestHandler stateRequestHandler,
+        BundleProgressHandler progressHandler,
+        BufferedOutputManager<OutputT> outputManager,
+        Map<String, TupleTag<?>> outputMap,
+        Collection<TimerReference> timers,
+        Coder<BoundedWindow> windowCoder,
+        KeySelector<WindowedValue<InputT>, ?> keySelector,
+        TimerInternals timerInternals) {
+      this.mainInput = mainInput;
+      this.stageBundleFactory = stageBundleFactory;
+      this.stateRequestHandler = stateRequestHandler;
+      this.progressHandler = progressHandler;
+      this.outputManager = outputManager;
+      this.outputMap = outputMap;
+      this.keySelector = keySelector;
+      this.timerInternals = timerInternals;
+      this.timerOutputIdToSpecMap = new HashMap<>();
+      // Gather all timers from all transforms by their output pCollectionId 
which is unique
+      for (Map<String, ProcessBundleDescriptors.TimerSpec> transformTimerMap :
+          
stageBundleFactory.getProcessBundleDescriptor().getTimerSpecs().values()) {
+        for (ProcessBundleDescriptors.TimerSpec timerSpec : 
transformTimerMap.values()) {
+          timerOutputIdToSpecMap.put(timerSpec.outputCollectionId(), 
timerSpec);
+        }
+      }
+      this.windowCoder = windowCoder;
+      this.outputQueue = new LinkedBlockingQueue<>();
+    }
+
     @Override
     public void startBundle() {
-      checkState(
-          stageBundleFactory != null, "%s not yet prepared", 
StageBundleFactory.class.getName());
-      checkState(
-          stateRequestHandler != null, "%s not yet prepared", 
StateRequestHandler.class.getName());
       OutputReceiverFactory receiverFactory =
           new OutputReceiverFactory() {
             @Override
@@ -328,6 +481,10 @@ public void startBundle() {
       try {
         remoteBundle =
             stageBundleFactory.getBundle(receiverFactory, stateRequestHandler, 
progressHandler);
+        mainInputReceiver =
+            Preconditions.checkNotNull(
+                remoteBundle.getInputReceivers().get(mainInput),
+                "Failed to retrieve main input receiver.");
       } catch (Exception e) {
         throw new RuntimeException("Failed to start remote bundle", e);
       }
@@ -335,11 +492,9 @@ public void startBundle() {
 
     @Override
     public void processElement(WindowedValue<InputT> element) {
-      checkState(remoteBundle != null, "%s not yet prepared", 
RemoteBundle.class.getName());
       try {
-        LOG.debug(String.format("Sending value: %s", element));
-        // TODO(BEAM-4681): Add support to Flink to support portable timers.
-        
Iterables.getOnlyElement(remoteBundle.getInputReceivers().values()).accept(element);
+        LOG.debug("Sending value: {}", element);
+        mainInputReceiver.accept(element);
       } catch (Exception e) {
         throw new RuntimeException("Failed to process element with SDK 
harness.", e);
       }
@@ -348,7 +503,30 @@ public void processElement(WindowedValue<InputT> element) {
 
     @Override
     public void onTimer(
-        String timerId, BoundedWindow window, Instant timestamp, TimeDomain 
timeDomain) {}
+        String timerId, BoundedWindow window, Instant timestamp, TimeDomain 
timeDomain) {
+      Preconditions.checkNotNull(
+          keyForTimerToBeFired, "Key for timer needs to be set before calling 
onTimer");
+      LOG.debug("timer callback: {} {} {} {}", timerId, window, timestamp, 
timeDomain);
+      FnDataReceiver<WindowedValue<?>> timerReceiver =
+          Preconditions.checkNotNull(
+              remoteBundle.getInputReceivers().get(timerId),
+              "No receiver found for timer %s",
+              timerId);
+      WindowedValue<KV<Object, Timer>> timerValue =
+          WindowedValue.of(
+              KV.of(keyForTimerToBeFired, Timer.of(timestamp, new byte[0])),
+              timestamp,
+              Collections.singleton(window),
+              PaneInfo.NO_FIRING);
+      try {
+        timerReceiver.accept(timerValue);
+      } catch (Exception e) {
+        throw new RuntimeException(
+            String.format(Locale.ENGLISH, "Failed to process timer %s", 
timerReceiver), e);
+      } finally {
+        keyForTimerToBeFired = null;
+      }
+    }
 
     @Override
     public void finishBundle() {
@@ -359,13 +537,80 @@ public void finishBundle() {
         emitResults();
       } catch (Exception e) {
         throw new RuntimeException("Failed to finish remote bundle", e);
+      } finally {
+        remoteBundle = null;
       }
+      if (bundleFinishedCallback != null) {
+        bundleFinishedCallback.run();
+        bundleFinishedCallback = null;
+      }
+    }
+
+    /** Key for timer which has not been registered yet. */
+    Object getTimerKeyForRegistration() {
+      return keyForTimerToBeSet;
+    }
+
+    /** Key for timer which is about to be fired. */
+    void setTimerKeyForFire(Object key) {
+      this.keyForTimerToBeFired = key;
+    }
+
+    boolean isBundleInProgress() {
+      return remoteBundle != null;
+    }
+
+    void setBundleFinishedCallback(Runnable callback) {
+      this.bundleFinishedCallback = callback;
     }
 
     private void emitResults() {
       KV<String, OutputT> result;
       while ((result = outputQueue.poll()) != null) {
-        outputManager.output(outputMap.get(result.getKey()), (WindowedValue) 
result.getValue());
+        final String outputPCollectionId = 
Preconditions.checkNotNull(result.getKey());
+        TupleTag<?> tag = outputMap.get(outputPCollectionId);
+        WindowedValue windowedValue =
+            Preconditions.checkNotNull(
+                (WindowedValue) result.getValue(),
+                "Received a null value from the SDK harness for %s",
+                outputPCollectionId);
+        if (tag != null) {
+          // process regular elements
+          outputManager.output(tag, windowedValue);
+        } else {
+          TimerSpec timerSpec =
+              Preconditions.checkNotNull(
+                  timerOutputIdToSpecMap.get(outputPCollectionId),
+                  "Unknown Pcollectionid %s",
+                  outputPCollectionId);
+          Timer timer =
+              Preconditions.checkNotNull(
+                  (Timer) ((KV) windowedValue.getValue()).getValue(),
+                  "Received null Timer from SDK harness: %s",
+                  windowedValue);
+          LOG.debug("Timer received: {} {}", outputPCollectionId, timer);
+          for (Object window : windowedValue.getWindows()) {
+            StateNamespace namespace = StateNamespaces.window(windowCoder, 
(BoundedWindow) window);
+            TimerInternals.TimerData timerData =
+                TimerInternals.TimerData.of(
+                    timerSpec.inputCollectionId(),
+                    namespace,
+                    timer.getTimestamp(),
+                    timerSpec.getTimerSpec().getTimeDomain());
+            setTimer(windowedValue, timerData);
+          }
+        }
+      }
+    }
+
+    private void setTimer(WindowedValue timerElement, TimerInternals.TimerData 
timerData) {
+      try {
+        keyForTimerToBeSet = keySelector.getKey(timerElement);
+        timerInternals.setTimer(timerData);
+      } catch (Exception e) {
+        throw new RuntimeException("Couldn't set timer", e);
+      } finally {
+        keyForTimerToBeSet = null;
       }
     }
 
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
new file mode 100644
index 00000000000..1d5226f7e1d
--- /dev/null
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Executors;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.Environments;
+import org.apache.beam.runners.core.construction.PipelineTranslation;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.testing.CrashingRunner;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Tests the state and timer integration of {@link
+ * 
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator}.
+ */
+@RunWith(Parameterized.class)
+public class PortableTimersExecutionTest implements Serializable {
+
+  @Parameters
+  // TODO(mxm) enable tor batch
+  public static Object[] testModes() {
+    return new Object[] {true};
+  }
+
+  @Parameter public boolean isStreaming;
+
+  private transient ListeningExecutorService flinkJobExecutor;
+
+  private static List<KV<String, Integer>> results = new ArrayList<>();
+
+  @Before
+  public void setup() {
+    results.clear();
+    flinkJobExecutor = 
MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
+  }
+
+  @After
+  public void tearDown() {
+    flinkJobExecutor.shutdown();
+  }
+
+  @Test(timeout = 60_000)
+  public void testTimerExecution() throws Exception {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    options.setRunner(CrashingRunner.class);
+    options.as(FlinkPipelineOptions.class).setFlinkMaster("[local]");
+    options.as(FlinkPipelineOptions.class).setStreaming(isStreaming);
+    options
+        .as(PortablePipelineOptions.class)
+        .setDefaultEnvironmentType(Environments.ENVIRONMENT_EMBEDDED);
+
+    final String timerId = "foo";
+    final String stateId = "sizzle";
+    final int offset = 5000;
+    final int timerOutput = 4093;
+    // Enough keys that we exercise interesting code paths
+    int numKeys = 50;
+    List<KV<String, Integer>> input = new ArrayList<>();
+    List<KV<String, Integer>> expectedOutput = new ArrayList<>();
+
+    for (Integer key = 0; key < numKeys; ++key) {
+      // Each key should have just one final output at GC time
+      expectedOutput.add(KV.of(key.toString(), timerOutput));
+
+      for (int i = 0; i < 15; ++i) {
+        // Each input should be output with the offset added
+        input.add(KV.of(key.toString(), i));
+        expectedOutput.add(KV.of(key.toString(), i + offset));
+      }
+    }
+    Collections.shuffle(input);
+
+    DoFn<byte[], KV<String, Integer>> inputFn =
+        new DoFn<byte[], KV<String, Integer>>() {
+          @ProcessElement
+          public void processElement(ProcessContext context) {
+            for (KV<String, Integer> stringIntegerKV : input) {
+              context.output(stringIntegerKV);
+            }
+          }
+        };
+
+    DoFn<KV<String, Integer>, KV<String, Integer>> testFn =
+        new DoFn<KV<String, Integer>, KV<String, Integer>>() {
+
+          @TimerId(timerId)
+          private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+          @StateId(stateId)
+          private final StateSpec<ValueState<String>> stateSpec =
+              StateSpecs.value(StringUtf8Coder.of());
+
+          @ProcessElement
+          public void processElement(
+              ProcessContext context,
+              @TimerId(timerId) Timer timer,
+              @StateId(stateId) ValueState<String> state,
+              BoundedWindow window) {
+            timer.set(window.maxTimestamp());
+            state.write(context.element().getKey());
+            context.output(
+                KV.of(context.element().getKey(), context.element().getValue() 
+ offset));
+          }
+
+          @OnTimer(timerId)
+          public void onTimer(
+              @StateId(stateId) ValueState<String> state, 
OutputReceiver<KV<String, Integer>> r) {
+            r.output(KV.of(state.read(), timerOutput));
+          }
+        };
+
+    DoFn<KV<String, Integer>, Void> collectResults =
+        new DoFn<KV<String, Integer>, Void>() {
+          @ProcessElement
+          public void processElement(ProcessContext context) {
+            results.add(context.element());
+          }
+        };
+
+    final Pipeline pipeline = Pipeline.create(options);
+    pipeline
+        .apply(Impulse.create())
+        .apply(ParDo.of(inputFn))
+        .apply(ParDo.of(testFn))
+        .apply(ParDo.of(collectResults));
+
+    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline);
+
+    FlinkJobInvocation jobInvocation =
+        FlinkJobInvocation.create(
+            "id",
+            "none",
+            flinkJobExecutor,
+            pipelineProto,
+            options.as(FlinkPipelineOptions.class),
+            Collections.emptyList());
+
+    jobInvocation.start();
+    long timeout = System.currentTimeMillis() + 60 * 1000;
+    while (jobInvocation.getState() != Enum.DONE && System.currentTimeMillis() 
< timeout) {
+      Thread.sleep(1000);
+    }
+    assertThat(jobInvocation.getState(), is(Enum.DONE));
+    assertThat(results, containsInAnyOrder(expectedOutput.toArray()));
+  }
+}
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
index 68d00a6ecf0..cc282c530ed 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
@@ -55,6 +55,7 @@
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.beam.vendor.grpc.v1_13_1.com.google.protobuf.Struct;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.flink.api.common.cache.DistributedCache;
@@ -85,6 +86,7 @@
   @Mock private FlinkExecutableStageContext stageContext;
   @Mock private StageBundleFactory stageBundleFactory;
   @Mock private StateRequestHandler stateRequestHandler;
+  @Mock private ProcessBundleDescriptors.ExecutableProcessBundleDescriptor 
processBundleDescriptor;
 
   // NOTE: ExecutableStage.fromPayload expects exactly one input, so we 
provide one here. These unit
   // tests in general ignore the executable stage itself and mock around it.
@@ -104,6 +106,8 @@ public void setUpMocks() {
     MockitoAnnotations.initMocks(this);
     when(runtimeContext.getDistributedCache()).thenReturn(distributedCache);
     
when(stageContext.getStageBundleFactory(any())).thenReturn(stageBundleFactory);
+    
when(processBundleDescriptor.getTimerSpecs()).thenReturn(Collections.emptyMap());
+    
when(stageBundleFactory.getProcessBundleDescriptor()).thenReturn(processBundleDescriptor);
   }
 
   @Test
@@ -125,7 +129,7 @@ public void sdkErrorsSurfaceOnClose() throws Exception {
 
     @SuppressWarnings("unchecked")
     FnDataReceiver<WindowedValue<?>> receiver = 
Mockito.mock(FnDataReceiver.class);
-    
when(bundle.getInputReceivers()).thenReturn(ImmutableMap.of("pCollectionId", 
receiver));
+    when(bundle.getInputReceivers()).thenReturn(ImmutableMap.of("input", 
receiver));
 
     Exception expected = new RuntimeException(new Exception());
     doThrow(expected).when(bundle).close();
@@ -149,7 +153,7 @@ public void expectedInputsAreSent() throws Exception {
 
     @SuppressWarnings("unchecked")
     FnDataReceiver<WindowedValue<?>> receiver = 
Mockito.mock(FnDataReceiver.class);
-    
when(bundle.getInputReceivers()).thenReturn(ImmutableMap.of("pCollectionId", 
receiver));
+    when(bundle.getInputReceivers()).thenReturn(ImmutableMap.of("input", 
receiver));
 
     WindowedValue<Integer> one = WindowedValue.valueInGlobalWindow(1);
     WindowedValue<Integer> two = WindowedValue.valueInGlobalWindow(2);
@@ -211,6 +215,9 @@ public void outputsAreTaggedCorrectly() throws Exception {
     // We use a real StageBundleFactory here in order to exercise the output 
receiver factory.
     StageBundleFactory stageBundleFactory =
         new StageBundleFactory() {
+
+          private boolean onceEmitted;
+
           @Override
           public RemoteBundle getBundle(
               OutputReceiverFactory receiverFactory,
@@ -225,7 +232,7 @@ public String getId() {
               @Override
               public Map<String, FnDataReceiver<WindowedValue<?>>> 
getInputReceivers() {
                 return ImmutableMap.of(
-                    "pCollectionId",
+                    "input",
                     input -> {
                       /* Ignore input*/
                     });
@@ -233,10 +240,14 @@ public String getId() {
 
               @Override
               public void close() throws Exception {
+                if (onceEmitted) {
+                  return;
+                }
                 // Emit all values to the runner when the bundle is closed.
                 receiverFactory.create(mainOutput.getId()).accept(three);
                 receiverFactory.create(additionalOutput1.getId()).accept(four);
                 receiverFactory.create(additionalOutput2.getId()).accept(five);
+                onceEmitted = true;
               }
             };
           }
@@ -244,7 +255,7 @@ public void close() throws Exception {
           @Override
           public ProcessBundleDescriptors.ExecutableProcessBundleDescriptor
               getProcessBundleDescriptor() {
-            return null;
+            return processBundleDescriptor;
           }
 
           @Override
@@ -291,11 +302,17 @@ public void testStageBundleClosed() throws Exception {
         new OneInputStreamOperatorTestHarness<>(operator);
 
     RemoteBundle bundle = Mockito.mock(RemoteBundle.class);
+    when(bundle.getInputReceivers())
+        .thenReturn(
+            ImmutableMap.<String, FnDataReceiver<WindowedValue>>builder()
+                .put("input", Mockito.mock(FnDataReceiver.class))
+                .build());
     when(stageBundleFactory.getBundle(any(), any(), any())).thenReturn(bundle);
 
     testHarness.open();
     testHarness.close();
 
+    verify(stageBundleFactory).getProcessBundleDescriptor();
     verify(stageBundleFactory).close();
     verify(stageContext).close();
     // DoFnOperator generates a final watermark, which triggers a new bundle..
@@ -350,6 +367,7 @@ public void testSerialization() {
             jobInfo,
             FlinkExecutableStageContext.factory(options),
             createOutputMap(mainOutput, ImmutableList.of(additionalOutput)),
+            WindowingStrategy.globalDefault(),
             null,
             null);
 
@@ -389,6 +407,7 @@ public void testSerialization() {
             jobInfo,
             contextFactory,
             createOutputMap(mainOutput, additionalOutputs),
+            WindowingStrategy.globalDefault(),
             null,
             null);
 
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java
index 69cd10c3752..af744eb05d1 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java
@@ -403,6 +403,7 @@ private static TargetEncoding addStageOutput(
               timerReference.transform().getId(),
               timerReference.localName(),
               inputTimerPCollectionId,
+              outputTimerPCollectionId,
               targetEncoding.getTarget(),
               spec));
     }
@@ -498,18 +499,21 @@ private static String keyValueCoderId(
     static <K, V, W extends BoundedWindow> TimerSpec<K, V, W> of(
         String transformId,
         String timerId,
-        String collectionId,
+        String inputCollectionId,
+        String outputCollectionId,
         Target outputTarget,
         org.apache.beam.sdk.state.TimerSpec timerSpec) {
       return new AutoValue_ProcessBundleDescriptors_TimerSpec(
-          transformId, timerId, collectionId, outputTarget, timerSpec);
+          transformId, timerId, inputCollectionId, outputCollectionId, 
outputTarget, timerSpec);
     }
 
     public abstract String transformId();
 
     public abstract String timerId();
 
-    public abstract String collectionId();
+    public abstract String inputCollectionId();
+
+    public abstract String outputCollectionId();
 
     public abstract Target outputTarget();
 
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
index 858735899c4..748efb47005 100644
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
@@ -655,10 +655,10 @@ public void processingTimer(
         descriptor.getTimerSpecs().values()) {
       for (ProcessBundleDescriptors.TimerSpec timerSpec : timerSpecs.values()) 
{
         if 
(TimeDomain.EVENT_TIME.equals(timerSpec.getTimerSpec().getTimeDomain())) {
-          eventTimeInputPCollectionId = timerSpec.collectionId();
+          eventTimeInputPCollectionId = timerSpec.inputCollectionId();
           eventTimeOutputTarget = timerSpec.outputTarget();
         } else if 
(TimeDomain.PROCESSING_TIME.equals(timerSpec.getTimerSpec().getTimeDomain())) {
-          processingTimeInputPCollectionId = timerSpec.collectionId();
+          processingTimeInputPCollectionId = timerSpec.inputCollectionId();
           processingTimeOutputTarget = timerSpec.outputTarget();
         } else {
           fail(String.format("Unknown timer specification %s", timerSpec));
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 5cd43825ca8..dab9f133c0f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -36,9 +36,11 @@
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
@@ -2839,7 +2841,20 @@ public void testAbsoluteProcessingTimeTimerRejected() 
throws Exception {
 
             @ProcessElement
             public void processElement(@TimerId(timerId) Timer timer) {
-              timer.set(new Instant(0));
+              try {
+                timer.set(new Instant(0));
+                fail("Should have failed due to processing time with absolute 
timer.");
+              } catch (RuntimeException e) {
+                String message = e.getMessage();
+                List<String> expectedSubstrings =
+                    Arrays.asList("relative timers", "processing time");
+                expectedSubstrings.forEach(
+                    str ->
+                        Preconditions.checkState(
+                            message.contains(str),
+                            "Pipeline didn't fail with the expected strings: 
%s",
+                            expectedSubstrings));
+              }
             }
 
             @OnTimer(timerId)
@@ -2847,11 +2862,6 @@ public void onTimer() {}
           };
 
       pipeline.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn));
-      thrown.expect(RuntimeException.class);
-      // Note that runners can reasonably vary their message - this matcher 
should be flexible
-      // and can be evolved.
-      thrown.expectMessage("relative timers");
-      thrown.expectMessage("processing time");
       pipeline.run();
     }
 
@@ -2873,7 +2883,19 @@ public void testOutOfBoundsEventTimeTimer() throws 
Exception {
             @ProcessElement
             public void processElement(
                 ProcessContext context, BoundedWindow window, 
@TimerId(timerId) Timer timer) {
-              timer.set(window.maxTimestamp().plus(1L));
+              try {
+                timer.set(window.maxTimestamp().plus(1L));
+                fail("Should have failed due to processing time with absolute 
timer.");
+              } catch (RuntimeException e) {
+                String message = e.getMessage();
+                List<String> expectedSubstrings = Arrays.asList("event time 
timer", "expiration");
+                expectedSubstrings.forEach(
+                    str ->
+                        Preconditions.checkState(
+                            message.contains(str),
+                            "Pipeline didn't fail with the expected strings: 
%s",
+                            expectedSubstrings));
+              }
             }
 
             @OnTimer(timerId)
@@ -2881,11 +2903,6 @@ public void onTimer() {}
           };
 
       pipeline.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn));
-      thrown.expect(RuntimeException.class);
-      // Note that runners can reasonably vary their message - this matcher 
should be flexible
-      // and can be evolved.
-      thrown.expectMessage("event time timer");
-      thrown.expectMessage("expiration");
       pipeline.run();
     }
 
diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py 
b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
index c44a2bf0093..942faed0554 100644
--- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
@@ -108,6 +108,7 @@ def test_no_subtransform_composite(self):
       raise unittest.SkipTest("BEAM-4781")
 
     def test_pardo_timers(self):
+      # TODO Enable once BEAM-5999 is fixed.
       raise unittest.SkipTest("BEAM-4681 - User timers not yet supported.")
 
     def test_assert_that(self):


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 166016)
    Time Spent: 18.5h  (was: 18h 20m)

> Integrate support for timers using the portability APIs into Flink
> ------------------------------------------------------------------
>
>                 Key: BEAM-4681
>                 URL: https://issues.apache.org/jira/browse/BEAM-4681
>             Project: Beam
>          Issue Type: Sub-task
>          Components: runner-flink
>            Reporter: Luke Cwik
>            Assignee: Maximilian Michels
>            Priority: Major
>              Labels: portability, portability-flink
>          Time Spent: 18.5h
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to