[ 
https://issues.apache.org/jira/browse/BEAM-4653?focusedWorklogId=125236&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-125236
 ]

ASF GitHub Bot logged work on BEAM-4653:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Jul/18 20:24
            Start Date: 19/Jul/18 20:24
    Worklog Time Spent: 10m 
      Work Description: lukecwik closed pull request #5898: [BEAM-4653] Add 
support to the Java SDK harness to execute timers.
URL: https://github.com/apache/beam/pull/5898
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 6003aa23f0d..433533ed4fd 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -961,8 +961,24 @@ public TimerInternalsTimer(
 
     @Override
     public void set(Instant target) {
-      verifyAbsoluteTimeDomain();
-      verifyTargetTime(target);
+      // Verifies that the time domain of this timer is acceptable for 
absolute timers.
+      if (!TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
+        throw new IllegalStateException(
+            "Can only set relative timers in processing time domain. Use 
#setRelative()");
+      }
+
+      // Ensures that the target time is reasonable. For event time timers 
this means that the time
+      // should be prior to window GC time.
+      if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
+        Instant windowExpiry = window.maxTimestamp().plus(allowedLateness);
+        checkArgument(
+            !target.isAfter(windowExpiry),
+            "Attempted to set event time timer for %s but that is after"
+                + " the expiration of window %s",
+            target,
+            windowExpiry);
+      }
+
       setUnderlyingTimer(target);
     }
 
@@ -1006,30 +1022,6 @@ private Instant minTargetAndGcTime(Instant target) {
       return target;
     }
 
-    /**
-     * Ensures that the target time is reasonable. For event time timers this 
means that the time
-     * should be prior to window GC time.
-     */
-    private void verifyTargetTime(Instant target) {
-      if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
-        Instant windowExpiry = window.maxTimestamp().plus(allowedLateness);
-        checkArgument(
-            !target.isAfter(windowExpiry),
-            "Attempted to set event time timer for %s but that is after"
-                + " the expiration of window %s",
-            target,
-            windowExpiry);
-      }
-    }
-
-    /** Verifies that the time domain of this timer is acceptable for absolute 
timers. */
-    private void verifyAbsoluteTimeDomain() {
-      if (!TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
-        throw new IllegalStateException(
-            "Cannot only set relative timers in processing time domain." + " 
Use #setRelative()");
-      }
-    }
-
     /**
      * Sets the timer for the target time without checking anything about 
whether it is a reasonable
      * thing to do. For example, absolute processing time timers are not 
really sensible since the
diff --git a/sdks/java/harness/build.gradle b/sdks/java/harness/build.gradle
index bdf20048bab..ce6b2db3370 100644
--- a/sdks/java/harness/build.gradle
+++ b/sdks/java/harness/build.gradle
@@ -59,4 +59,5 @@ dependencies {
   testCompile library.java.junit
   testCompile library.java.mockito_core
   testCompile library.java.slf4j_jdk14
+  testCompile project(path: ":beam-sdks-java-core", configuration: 
"shadowTest")
 }
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
index cb3c731545c..6ec31551288 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
@@ -22,7 +22,7 @@
 
 import com.google.auto.service.AutoService;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Multimap;
+import com.google.common.collect.ListMultimap;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Map;
@@ -87,7 +87,7 @@
         Map<String, PCollection> pCollections,
         Map<String, RunnerApi.Coder> coders,
         Map<String, RunnerApi.WindowingStrategy> windowingStrategies,
-        Multimap<String, FnDataReceiver<WindowedValue<?>>> 
pCollectionIdsToConsumers,
+        ListMultimap<String, FnDataReceiver<WindowedValue<?>>> 
pCollectionIdsToConsumers,
         Consumer<ThrowingRunnable> addStartFunction,
         Consumer<ThrowingRunnable> addFinishFunction,
         BundleSplitListener splitListener)
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
index 8d5ab91d25d..02f3babc817 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
@@ -22,7 +22,7 @@
 
 import com.google.auto.service.AutoService;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Multimap;
+import com.google.common.collect.ListMultimap;
 import java.io.IOException;
 import java.util.Map;
 import java.util.function.Consumer;
@@ -81,7 +81,7 @@
         Map<String, PCollection> pCollections,
         Map<String, RunnerApi.Coder> coders,
         Map<String, RunnerApi.WindowingStrategy> windowingStrategies,
-        Multimap<String, FnDataReceiver<WindowedValue<?>>> 
pCollectionIdsToConsumers,
+        ListMultimap<String, FnDataReceiver<WindowedValue<?>>> 
pCollectionIdsToConsumers,
         Consumer<ThrowingRunnable> addStartFunction,
         Consumer<ThrowingRunnable> addFinishFunction,
         BundleSplitListener splitListener)
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java
index 1d40ca3dad9..323edfb3371 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java
@@ -21,7 +21,7 @@
 import com.google.auto.service.AutoService;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Multimap;
+import com.google.common.collect.ListMultimap;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Map;
@@ -79,7 +79,7 @@
         Map<String, PCollection> pCollections,
         Map<String, Coder> coders,
         Map<String, RunnerApi.WindowingStrategy> windowingStrategies,
-        Multimap<String, FnDataReceiver<WindowedValue<?>>> 
pCollectionIdsToConsumers,
+        ListMultimap<String, FnDataReceiver<WindowedValue<?>>> 
pCollectionIdsToConsumers,
         Consumer<ThrowingRunnable> addStartFunction,
         Consumer<ThrowingRunnable> addFinishFunction,
         BundleSplitListener splitListener) {
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java
index 178c4c1c0d8..e2d21b71ac7 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java
@@ -21,7 +21,7 @@
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Multimap;
+import com.google.common.collect.ListMultimap;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Map;
@@ -125,7 +125,7 @@ void finishBundle() throws Exception {
         Map<String, PCollection> pCollections,
         Map<String, RunnerApi.Coder> coders,
         Map<String, RunnerApi.WindowingStrategy> windowingStrategies,
-        Multimap<String, FnDataReceiver<WindowedValue<?>>> 
pCollectionIdsToConsumers,
+        ListMultimap<String, FnDataReceiver<WindowedValue<?>>> 
pCollectionIdsToConsumers,
         Consumer<ThrowingRunnable> addStartFunction,
         Consumer<ThrowingRunnable> addFinishFunction,
         BundleSplitListener splitListener)
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/DoFnPTransformRunnerFactory.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/DoFnPTransformRunnerFactory.java
index f7dcb5fd79d..61a341eaed2 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/DoFnPTransformRunnerFactory.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/DoFnPTransformRunnerFactory.java
@@ -24,7 +24,6 @@
 import com.google.common.collect.Iterables;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.util.Map;
@@ -41,16 +40,22 @@
 import org.apache.beam.runners.core.construction.PCollectionViewTranslation;
 import org.apache.beam.runners.core.construction.ParDoTranslation;
 import org.apache.beam.runners.core.construction.RehydratedComponents;
+import org.apache.beam.runners.core.construction.Timer;
+import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.fn.function.ThrowingRunnable;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Materializations;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
 
@@ -66,6 +71,9 @@
 
     void processElement(WindowedValue<T> input) throws Exception;
 
+    void processTimer(
+        String timerId, TimeDomain timeDomain, WindowedValue<KV<Object, 
Timer>> input);
+
     void finishBundle() throws Exception;
   }
 
@@ -80,7 +88,7 @@ public final RunnerT createRunnerForPTransform(
       Map<String, PCollection> pCollections,
       Map<String, RunnerApi.Coder> coders,
       Map<String, RunnerApi.WindowingStrategy> windowingStrategies,
-      Multimap<String, FnDataReceiver<WindowedValue<?>>> 
pCollectionIdsToConsumers,
+      ListMultimap<String, FnDataReceiver<WindowedValue<?>>> 
pCollectionIdsToConsumers,
       Consumer<ThrowingRunnable> addStartFunction,
       Consumer<ThrowingRunnable> addFinishFunction,
       BundleSplitListener splitListener) {
@@ -103,12 +111,28 @@ public final RunnerT createRunnerForPTransform(
     addStartFunction.accept(runner::startBundle);
     Iterable<String> mainInput =
         Sets.difference(
-            pTransform.getInputsMap().keySet(), 
context.parDoPayload.getSideInputsMap().keySet());
+            pTransform.getInputsMap().keySet(),
+            Sets.union(
+                context.parDoPayload.getSideInputsMap().keySet(),
+                context.parDoPayload.getTimerSpecsMap().keySet()));
     for (String localInputName : mainInput) {
       pCollectionIdsToConsumers.put(
           pTransform.getInputsOrThrow(localInputName),
           (FnDataReceiver) (FnDataReceiver<WindowedValue<TransformInputT>>) 
runner::processElement);
     }
+
+    // Register as a consumer for each timer PCollection.
+    for (String localName : context.parDoPayload.getTimerSpecsMap().keySet()) {
+      TimeDomain timeDomain =
+          DoFnSignatures.getTimerSpecOrThrow(
+                  context.doFnSignature.timerDeclarations().get(localName), 
context.doFn)
+              .getTimeDomain();
+      pCollectionIdsToConsumers.put(
+          pTransform.getInputsOrThrow(localName),
+          (timer) ->
+              runner.processTimer(localName, timeDomain, 
(WindowedValue<KV<Object, Timer>>) timer));
+    }
+
     addFinishFunction.accept(runner::finishBundle);
     return runner;
   }
@@ -123,6 +147,7 @@ public final RunnerT createRunnerForPTransform(
     final Supplier<String> processBundleInstructionId;
     final RehydratedComponents rehydratedComponents;
     final DoFn<InputT, OutputT> doFn;
+    final DoFnSignature doFnSignature;
     final TupleTag<OutputT> mainOutputTag;
     final Coder<?> inputCoder;
     final Coder<?> keyCoder;
@@ -131,7 +156,7 @@ public final RunnerT createRunnerForPTransform(
     final Map<TupleTag<?>, SideInputSpec> tagToSideInputSpecMap;
     Map<TupleTag<?>, Coder<?>> outputCoders;
     final ParDoPayload parDoPayload;
-    final ListMultimap<TupleTag<?>, FnDataReceiver<WindowedValue<?>>> 
tagToConsumer;
+    final ListMultimap<String, FnDataReceiver<WindowedValue<?>>> 
localNameToConsumer;
     final BundleSplitListener splitListener;
 
     Context(
@@ -143,7 +168,7 @@ public final RunnerT createRunnerForPTransform(
         Map<String, PCollection> pCollections,
         Map<String, RunnerApi.Coder> coders,
         Map<String, RunnerApi.WindowingStrategy> windowingStrategies,
-        Multimap<String, FnDataReceiver<WindowedValue<?>>> 
pCollectionIdsToConsumers,
+        ListMultimap<String, FnDataReceiver<WindowedValue<?>>> 
pCollectionIdsToConsumers,
         BundleSplitListener splitListener) {
       this.pipelineOptions = pipelineOptions;
       this.beamFnStateClient = beamFnStateClient;
@@ -155,17 +180,23 @@ public final RunnerT createRunnerForPTransform(
       try {
         rehydratedComponents =
             RehydratedComponents.forComponents(
-                RunnerApi.Components.newBuilder()
-                    .putAllCoders(coders)
-                    .putAllWindowingStrategies(windowingStrategies)
-                    .build());
+                    RunnerApi.Components.newBuilder()
+                        .putAllCoders(coders)
+                        .putAllPcollections(pCollections)
+                        .putAllWindowingStrategies(windowingStrategies)
+                        .build())
+                .withPipeline(Pipeline.create());
         parDoPayload = 
ParDoPayload.parseFrom(pTransform.getSpec().getPayload());
         doFn = (DoFn) ParDoTranslation.getDoFn(parDoPayload);
+        doFnSignature = DoFnSignatures.signatureForDoFn(doFn);
         mainOutputTag = (TupleTag) 
ParDoTranslation.getMainOutputTag(parDoPayload);
         String mainInputTag =
             Iterables.getOnlyElement(
                 Sets.difference(
-                    pTransform.getInputsMap().keySet(), 
parDoPayload.getSideInputsMap().keySet()));
+                    pTransform.getInputsMap().keySet(),
+                    Sets.union(
+                        parDoPayload.getSideInputsMap().keySet(),
+                        parDoPayload.getTimerSpecsMap().keySet())));
         PCollection mainInput = 
pCollections.get(pTransform.getInputsOrThrow(mainInputTag));
         inputCoder = rehydratedComponents.getCoder(mainInput.getCoderId());
         if (inputCoder instanceof KvCoder
@@ -225,13 +256,13 @@ public final RunnerT createRunnerForPTransform(
         throw new IllegalArgumentException("Malformed ParDoPayload", exn);
       }
 
-      ImmutableListMultimap.Builder<TupleTag<?>, 
FnDataReceiver<WindowedValue<?>>>
-          tagToConsumerBuilder = ImmutableListMultimap.builder();
+      ImmutableListMultimap.Builder<String, FnDataReceiver<WindowedValue<?>>>
+          localNameToConsumerBuilder = ImmutableListMultimap.builder();
       for (Map.Entry<String, String> entry : 
pTransform.getOutputsMap().entrySet()) {
-        tagToConsumerBuilder.putAll(
-            new TupleTag<>(entry.getKey()), 
pCollectionIdsToConsumers.get(entry.getValue()));
+        localNameToConsumerBuilder.putAll(
+            entry.getKey(), pCollectionIdsToConsumers.get(entry.getValue()));
       }
-      tagToConsumer = tagToConsumerBuilder.build();
+      localNameToConsumer = localNameToConsumerBuilder.build();
       tagToSideInputSpecMap = tagToSideInputSpecMapBuilder.build();
       this.splitListener = splitListener;
     }
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FlattenRunner.java 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FlattenRunner.java
index f50c49944c6..8b764206cfe 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FlattenRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FlattenRunner.java
@@ -22,7 +22,7 @@
 import com.google.auto.service.AutoService;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Multimap;
+import com.google.common.collect.ListMultimap;
 import java.io.IOException;
 import java.util.Map;
 import java.util.function.Consumer;
@@ -65,7 +65,7 @@
         Map<String, PCollection> pCollections,
         Map<String, Coder> coders,
         Map<String, RunnerApi.WindowingStrategy> windowingStrategies,
-        Multimap<String, FnDataReceiver<WindowedValue<?>>> 
pCollectionIdsToConsumers,
+        ListMultimap<String, FnDataReceiver<WindowedValue<?>>> 
pCollectionIdsToConsumers,
         Consumer<ThrowingRunnable> addStartFunction,
         Consumer<ThrowingRunnable> addFinishFunction,
         BundleSplitListener splitListener)
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index 392bc46eac5..b8a02f673a6 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -23,6 +23,7 @@
 
 import com.google.auto.service.AutoService;
 import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.Map;
@@ -30,7 +31,9 @@
 import org.apache.beam.fn.harness.DoFnPTransformRunnerFactory.Context;
 import org.apache.beam.fn.harness.state.FnApiStateAccessor;
 import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.LateDataUtils;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.Timer;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -39,7 +42,6 @@
 import org.apache.beam.sdk.state.State;
 import org.apache.beam.sdk.state.StateSpec;
 import org.apache.beam.sdk.state.TimeDomain;
-import org.apache.beam.sdk.state.Timer;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
 import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
@@ -50,15 +52,19 @@
 import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.FieldAccessDeclaration;
 import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RowParameter;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.StateDeclaration;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.DateTimeUtils;
+import org.joda.time.Duration;
 import org.joda.time.Instant;
 
 /**
@@ -91,16 +97,16 @@
   private final Context<InputT, OutputT> context;
   private final Collection<FnDataReceiver<WindowedValue<OutputT>>> 
mainOutputConsumers;
   private FnApiStateAccessor stateAccessor;
-  private final DoFnSignature doFnSignature;
   private final DoFnInvoker<InputT, OutputT> doFnInvoker;
   private final DoFn<InputT, OutputT>.StartBundleContext startBundleContext;
   private final ProcessBundleContext processContext;
+  private final OnTimerContext onTimerContext;
   private final DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext;
 
   /** Only valid during {@link #processElement}, null otherwise. */
   private WindowedValue<InputT> currentElement;
 
-  /** Only valid during {@link #processElement}, null otherwise. */
+  /** Only valid during {@link #processElement} and {@link #processTimer}, 
null otherwise. */
   private BoundedWindow currentWindow;
 
   /** Following fields are only valid if a Schema is set, null otherwise. */
@@ -109,13 +115,18 @@
   @Nullable private final SchemaCoder<OutputT> mainOutputSchemaCoder;
   @Nullable private final FieldAccessDescriptor fieldAccessDescriptor;
 
+  /** Only valid during {@link #processTimer}, null otherwise. */
+  private WindowedValue<KV<Object, Timer>> currentTimer;
+
+  /** Only valid during {@link #processTimer}, null otherwise. */
+  private TimeDomain currentTimeDomain;
+
   FnApiDoFnRunner(Context<InputT, OutputT> context) {
     this.context = context;
 
     this.mainOutputConsumers =
         (Collection<FnDataReceiver<WindowedValue<OutputT>>>)
-            (Collection) context.tagToConsumer.get(context.mainOutputTag);
-    this.doFnSignature = DoFnSignatures.signatureForDoFn(context.doFn);
+            (Collection) 
context.localNameToConsumer.get(context.mainOutputTag.getId());
     this.doFnInvoker = DoFnInvokers.invokerFor(context.doFn);
     this.doFnInvoker.invokeSetup();
 
@@ -127,7 +138,8 @@ public PipelineOptions getPipelineOptions() {
           }
         };
     this.processContext = new ProcessBundleContext();
-    finishBundleContext =
+    this.onTimerContext = new OnTimerContext();
+    this.finishBundleContext =
         this.context.doFn.new FinishBundleContext() {
           @Override
           public PipelineOptions getPipelineOptions() {
@@ -145,7 +157,7 @@ public void output(OutputT output, Instant timestamp, 
BoundedWindow window) {
           public <T> void output(
               TupleTag<T> tag, T output, Instant timestamp, BoundedWindow 
window) {
             Collection<FnDataReceiver<WindowedValue<T>>> consumers =
-                (Collection) context.tagToConsumer.get(tag);
+                (Collection) context.localNameToConsumer.get(tag.getId());
             if (consumers == null) {
               throw new IllegalArgumentException(String.format("Unknown output 
tag %s", tag));
             }
@@ -235,6 +247,25 @@ public void processElement(WindowedValue<InputT> elem) {
     }
   }
 
+  @Override
+  public void processTimer(
+      String timerId, TimeDomain timeDomain, WindowedValue<KV<Object, Timer>> 
timer) {
+    currentTimer = timer;
+    currentTimeDomain = timeDomain;
+    try {
+      Iterator<BoundedWindow> windowIterator =
+          (Iterator<BoundedWindow>) timer.getWindows().iterator();
+      while (windowIterator.hasNext()) {
+        currentWindow = windowIterator.next();
+        doFnInvoker.invokeOnTimer(timerId, onTimerContext);
+      }
+    } finally {
+      currentTimer = null;
+      currentTimeDomain = null;
+      currentWindow = null;
+    }
+  }
+
   @Override
   public void finishBundle() {
     doFnInvoker.invokeFinishBundle(finishBundleContext);
@@ -256,6 +287,125 @@ public void finishBundle() {
     }
   }
 
+  private class FnApiTimer implements org.apache.beam.sdk.state.Timer {
+    private final String timerId;
+    private final TimeDomain timeDomain;
+    private final Instant currentTimestamp;
+    private final Duration allowedLateness;
+    private final WindowedValue<?> currentElementOrTimer;
+
+    private Duration period = Duration.ZERO;
+    private Duration offset = Duration.ZERO;
+
+    FnApiTimer(String timerId, WindowedValue<KV<?, ?>> currentElementOrTimer) {
+      this.timerId = timerId;
+      this.currentElementOrTimer = currentElementOrTimer;
+
+      TimerDeclaration timerDeclaration = 
context.doFnSignature.timerDeclarations().get(timerId);
+      this.timeDomain =
+          DoFnSignatures.getTimerSpecOrThrow(timerDeclaration, 
context.doFn).getTimeDomain();
+
+      switch (timeDomain) {
+        case EVENT_TIME:
+          this.currentTimestamp = currentElementOrTimer.getTimestamp();
+          break;
+        case PROCESSING_TIME:
+          this.currentTimestamp = new 
Instant(DateTimeUtils.currentTimeMillis());
+          break;
+        case SYNCHRONIZED_PROCESSING_TIME:
+          this.currentTimestamp = new 
Instant(DateTimeUtils.currentTimeMillis());
+          break;
+        default:
+          throw new IllegalArgumentException(String.format("Unknown time 
domain %s", timeDomain));
+      }
+
+      try {
+        this.allowedLateness =
+            context
+                .rehydratedComponents
+                .getPCollection(context.pTransform.getInputsOrThrow(timerId))
+                .getWindowingStrategy()
+                .getAllowedLateness();
+      } catch (IOException e) {
+        throw new IllegalArgumentException(
+            String.format("Unable to get allowed lateness for timer %s", 
timerId));
+      }
+    }
+
+    @Override
+    public void set(Instant absoluteTime) {
+      // Verifies that the time domain of this timer is acceptable for 
absolute timers.
+      if (!TimeDomain.EVENT_TIME.equals(timeDomain)) {
+        throw new IllegalArgumentException(
+            "Can only set relative timers in processing time domain. Use 
#setRelative()");
+      }
+
+      // Ensures that the target time is reasonable. For event time timers 
this means that the time
+      // should be prior to window GC time.
+      if (TimeDomain.EVENT_TIME.equals(timeDomain)) {
+        Instant windowExpiry = 
LateDataUtils.garbageCollectionTime(currentWindow, allowedLateness);
+        checkArgument(
+            !absoluteTime.isAfter(windowExpiry),
+            "Attempted to set event time timer for %s but that is after"
+                + " the expiration of window %s",
+            absoluteTime,
+            windowExpiry);
+      }
+
+      output(absoluteTime);
+    }
+
+    @Override
+    public void setRelative() {
+      Instant target;
+      if (period.equals(Duration.ZERO)) {
+        target = currentTimestamp.plus(offset);
+      } else {
+        long millisSinceStart = currentTimestamp.plus(offset).getMillis() % 
period.getMillis();
+        target =
+            millisSinceStart == 0
+                ? currentTimestamp
+                : currentTimestamp.plus(period).minus(millisSinceStart);
+      }
+      target = minTargetAndGcTime(target);
+      output(target);
+    }
+
+    @Override
+    public org.apache.beam.sdk.state.Timer offset(Duration offset) {
+      this.offset = offset;
+      return this;
+    }
+
+    @Override
+    public org.apache.beam.sdk.state.Timer align(Duration period) {
+      this.period = period;
+      return this;
+    }
+
+    /**
+     * For event time timers the target time should be prior to window GC 
time. So it returns
+     * min(time to set, GC Time of window).
+     */
+    private Instant minTargetAndGcTime(Instant target) {
+      if (TimeDomain.EVENT_TIME.equals(timeDomain)) {
+        Instant windowExpiry = 
LateDataUtils.garbageCollectionTime(currentWindow, allowedLateness);
+        if (target.isAfter(windowExpiry)) {
+          return windowExpiry;
+        }
+      }
+      return target;
+    }
+
+    private void output(Instant scheduledTime) {
+      Object key = ((KV) currentElementOrTimer.getValue()).getKey();
+      Collection<FnDataReceiver<WindowedValue<KV<Object, Timer>>>> consumers =
+          (Collection) context.localNameToConsumer.get(timerId);
+
+      outputTo(consumers, currentElementOrTimer.withValue(KV.of(key, 
Timer.of(scheduledTime))));
+    }
+  }
+
   /**
    * Provides arguments for a {@link DoFnInvoker} for {@link 
DoFn.ProcessElement @ProcessElement}.
    */
@@ -333,17 +483,18 @@ public MultiOutputReceiver 
taggedOutputReceiver(DoFn<InputT, OutputT> doFn) {
 
     @Override
     public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, 
OutputT> doFn) {
-      throw new UnsupportedOperationException("TODO: Add support for timers");
+      throw new UnsupportedOperationException(
+          "Cannot access OnTimerContext outside of @OnTimer methods.");
     }
 
     @Override
     public RestrictionTracker<?, ?> restrictionTracker() {
-      throw new UnsupportedOperationException("TODO: Add support for 
SplittableDoFn");
+      throw new UnsupportedOperationException("RestrictionTracker parameters 
are not supported.");
     }
 
     @Override
     public State state(String stateId) {
-      StateDeclaration stateDeclaration = 
doFnSignature.stateDeclarations().get(stateId);
+      StateDeclaration stateDeclaration = 
context.doFnSignature.stateDeclarations().get(stateId);
       checkNotNull(stateDeclaration, "No state declaration found for %s", 
stateId);
       StateSpec<?> spec;
       try {
@@ -355,8 +506,13 @@ public State state(String stateId) {
     }
 
     @Override
-    public Timer timer(String timerId) {
-      throw new UnsupportedOperationException("TODO: Add support for timers");
+    public org.apache.beam.sdk.state.Timer timer(String timerId) {
+      checkState(
+          currentElement.getValue() instanceof KV,
+          "Accessing timer in unkeyed context. Current element is not a KV: 
%s.",
+          currentElement.getValue());
+
+      return new FnApiTimer(timerId, (WindowedValue) currentElement);
     }
 
     @Override
@@ -387,7 +543,7 @@ public void outputWithTimestamp(OutputT output, Instant 
timestamp) {
     @Override
     public <T> void output(TupleTag<T> tag, T output) {
       Collection<FnDataReceiver<WindowedValue<T>>> consumers =
-          (Collection) context.tagToConsumer.get(tag);
+          (Collection) context.localNameToConsumer.get(tag.getId());
       if (consumers == null) {
         throw new IllegalArgumentException(String.format("Unknown output tag 
%s", tag));
       }
@@ -400,7 +556,7 @@ public void outputWithTimestamp(OutputT output, Instant 
timestamp) {
     @Override
     public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant 
timestamp) {
       Collection<FnDataReceiver<WindowedValue<T>>> consumers =
-          (Collection) context.tagToConsumer.get(tag);
+          (Collection) context.localNameToConsumer.get(tag.getId());
       if (consumers == null) {
         throw new IllegalArgumentException(String.format("Unknown output tag 
%s", tag));
       }
@@ -433,4 +589,178 @@ public void updateWatermark(Instant watermark) {
       throw new UnsupportedOperationException("TODO: Add support for 
SplittableDoFn");
     }
   }
+
+  /** Provides arguments for a {@link DoFnInvoker} for {@link DoFn.OnTimer 
@OnTimer}. */
+  private class OnTimerContext extends DoFn<InputT, OutputT>.OnTimerContext
+      implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
+
+    private OnTimerContext() {
+      context.doFn.super();
+    }
+
+    @Override
+    public BoundedWindow window() {
+      return currentWindow;
+    }
+
+    @Override
+    public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access paneInfo outside of @ProcessElement methods.");
+    }
+
+    @Override
+    public DoFn<InputT, OutputT>.StartBundleContext 
startBundleContext(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access StartBundleContext outside of @StartBundle method.");
+    }
+
+    @Override
+    public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext(
+        DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access FinishBundleContext outside of @FinishBundle 
method.");
+    }
+
+    @Override
+    public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, 
OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access ProcessContext outside of @ProcessElement method.");
+    }
+
+    @Override
+    public InputT element(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException("Element parameters are not 
supported.");
+    }
+
+    @Override
+    public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+      return timestamp();
+    }
+
+    @Override
+    public Row asRow(@Nullable String id) {
+      throw new UnsupportedOperationException(
+          "Cannot access element outside of @ProcessElement method.");
+    }
+
+    @Override
+    public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {
+      return timeDomain();
+    }
+
+    @Override
+    public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {
+      return DoFnOutputReceivers.windowedReceiver(this, null);
+    }
+
+    @Override
+    public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {
+      return DoFnOutputReceivers.rowReceiver(this, null, 
mainOutputSchemaCoder);
+    }
+
+    @Override
+    public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> 
doFn) {
+      return DoFnOutputReceivers.windowedMultiReceiver(this);
+    }
+
+    @Override
+    public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, 
OutputT> doFn) {
+      return this;
+    }
+
+    @Override
+    public RestrictionTracker<?, ?> restrictionTracker() {
+      throw new UnsupportedOperationException("RestrictionTracker parameters 
are not supported.");
+    }
+
+    @Override
+    public State state(String stateId) {
+      StateDeclaration stateDeclaration = 
context.doFnSignature.stateDeclarations().get(stateId);
+      checkNotNull(stateDeclaration, "No state declaration found for %s", 
stateId);
+      StateSpec<?> spec;
+      try {
+        spec = (StateSpec<?>) stateDeclaration.field().get(context.doFn);
+      } catch (IllegalAccessException e) {
+        throw new RuntimeException(e);
+      }
+      return spec.bind(stateId, stateAccessor);
+    }
+
+    @Override
+    public org.apache.beam.sdk.state.Timer timer(String timerId) {
+      checkState(
+          currentTimer.getValue() instanceof KV,
+          "Accessing timer in unkeyed context. Current timer is not a KV: %s.",
+          currentTimer);
+
+      return new FnApiTimer(timerId, (WindowedValue) currentTimer);
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return context.pipelineOptions;
+    }
+
+    @Override
+    public PipelineOptions pipelineOptions() {
+      return context.pipelineOptions;
+    }
+
+    @Override
+    public void output(OutputT output) {
+      outputTo(
+          mainOutputConsumers,
+          WindowedValue.of(output, currentTimer.getTimestamp(), currentWindow, 
PaneInfo.NO_FIRING));
+    }
+
+    @Override
+    public void outputWithTimestamp(OutputT output, Instant timestamp) {
+      checkArgument(
+          !currentTimer.getTimestamp().isAfter(timestamp),
+          "Output time %s can not be before timer timestamp %s.",
+          timestamp,
+          currentTimer.getTimestamp());
+      outputTo(
+          mainOutputConsumers,
+          WindowedValue.of(output, timestamp, currentWindow, 
PaneInfo.NO_FIRING));
+    }
+
+    @Override
+    public <T> void output(TupleTag<T> tag, T output) {
+      Collection<FnDataReceiver<WindowedValue<T>>> consumers =
+          (Collection) context.localNameToConsumer.get(tag.getId());
+      if (consumers == null) {
+        throw new IllegalArgumentException(String.format("Unknown output tag 
%s", tag));
+      }
+      outputTo(
+          consumers,
+          WindowedValue.of(output, currentTimer.getTimestamp(), currentWindow, 
PaneInfo.NO_FIRING));
+    }
+
+    @Override
+    public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant 
timestamp) {
+      checkArgument(
+          !currentTimer.getTimestamp().isAfter(timestamp),
+          "Output time %s can not be before timer timestamp %s.",
+          timestamp,
+          currentTimer.getTimestamp());
+      Collection<FnDataReceiver<WindowedValue<T>>> consumers =
+          (Collection) context.localNameToConsumer.get(tag.getId());
+      if (consumers == null) {
+        throw new IllegalArgumentException(String.format("Unknown output tag 
%s", tag));
+      }
+      outputTo(consumers, WindowedValue.of(output, timestamp, currentWindow, 
PaneInfo.NO_FIRING));
+    }
+
+    @Override
+    public TimeDomain timeDomain() {
+      return currentTimeDomain;
+    }
+
+    @Override
+    public Instant timestamp() {
+      return currentTimer.getTimestamp();
+    }
+  }
 }
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/MapFnRunners.java 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/MapFnRunners.java
index e2c61de7183..f0e8c684b16 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/MapFnRunners.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/MapFnRunners.java
@@ -21,7 +21,7 @@
 import static com.google.common.collect.Iterables.getOnlyElement;
 
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Multimap;
+import com.google.common.collect.ListMultimap;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Map;
@@ -108,7 +108,7 @@ private Factory(MapperFactory<InputT, OutputT> 
mapperFactory) {
         Map<String, PCollection> pCollections,
         Map<String, RunnerApi.Coder> coders,
         Map<String, RunnerApi.WindowingStrategy> windowingStrategies,
-        Multimap<String, FnDataReceiver<WindowedValue<?>>> 
pCollectionIdsToConsumers,
+        ListMultimap<String, FnDataReceiver<WindowedValue<?>>> 
pCollectionIdsToConsumers,
         Consumer<ThrowingRunnable> addStartFunction,
         Consumer<ThrowingRunnable> addFinishFunction,
         BundleSplitListener splitListener)
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java
index edac09ea135..3e663d803c7 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.fn.harness;
 
-import com.google.common.collect.Multimap;
+import com.google.common.collect.ListMultimap;
 import java.io.IOException;
 import java.util.Map;
 import java.util.function.Consumer;
@@ -70,7 +70,7 @@ T createRunnerForPTransform(
       Map<String, PCollection> pCollections,
       Map<String, Coder> coders,
       Map<String, RunnerApi.WindowingStrategy> windowingStrategies,
-      Multimap<String, FnDataReceiver<WindowedValue<?>>> 
pCollectionIdsToConsumers,
+      ListMultimap<String, FnDataReceiver<WindowedValue<?>>> 
pCollectionIdsToConsumers,
       Consumer<ThrowingRunnable> addStartFunction,
       Consumer<ThrowingRunnable> addFinishFunction,
       BundleSplitListener splitListener)
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java
index 1af56c73e55..6ade4bbe234 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java
@@ -36,9 +36,11 @@
 import org.apache.beam.runners.core.OutputWindowedValue;
 import org.apache.beam.runners.core.SplittableProcessElementInvoker;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.Timer;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
@@ -82,7 +84,7 @@
           context,
           windowedCoder,
           (Collection<FnDataReceiver<WindowedValue<OutputT>>>)
-              (Collection) context.tagToConsumer.get(context.mainOutputTag),
+              (Collection) 
context.localNameToConsumer.get(context.mainOutputTag.getId()),
           
Iterables.getOnlyElement(context.pTransform.getInputsMap().keySet()));
     }
   }
@@ -196,7 +198,7 @@ public void outputWindowedValue(
                       Collection<? extends BoundedWindow> windows,
                       PaneInfo pane) {
                     
Collection<FnDataReceiver<WindowedValue<AdditionalOutputT>>> consumers =
-                        (Collection) context.tagToConsumer.get(tag);
+                        (Collection) 
context.localNameToConsumer.get(tag.getId());
                     if (consumers == null) {
                       throw new IllegalArgumentException(
                           String.format("Unknown output tag %s", tag));
@@ -247,6 +249,12 @@ public void outputWindowedValue(
     }
   }
 
+  @Override
+  public void processTimer(
+      String timerId, TimeDomain timeDomain, WindowedValue<KV<Object, Timer>> 
input) {
+    throw new UnsupportedOperationException("Timers are unsupported in a 
SplittableDoFn.");
+  }
+
   @Override
   public void finishBundle() {
     doFnInvoker.invokeFinishBundle(finishBundleContext);
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
index 4007426c17f..c76357cdf7a 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
@@ -386,7 +386,7 @@ public Object createRunnerForPTransform(
         Map<String, PCollection> pCollections,
         Map<String, Coder> coders,
         Map<String, WindowingStrategy> windowingStrategies,
-        Multimap<String, FnDataReceiver<WindowedValue<?>>> 
pCollectionIdsToConsumers,
+        ListMultimap<String, FnDataReceiver<WindowedValue<?>>> 
pCollectionIdsToConsumers,
         Consumer<ThrowingRunnable> addStartFunction,
         Consumer<ThrowingRunnable> addFinishFunction,
         BundleSplitListener splitListener) {
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
index d885d1fa005..976731151ef 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
@@ -31,11 +31,11 @@
 import static org.mockito.Mockito.when;
 
 import com.google.common.base.Suppliers;
-import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Multimap;
+import com.google.common.collect.ListMultimap;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -127,7 +127,7 @@ public void testCreatingAndProcessingBeamFnDataReadRunner() 
throws Exception {
 
     List<WindowedValue<String>> outputValues = new ArrayList<>();
 
-    Multimap<String, FnDataReceiver<WindowedValue<?>>> consumers = 
HashMultimap.create();
+    ListMultimap<String, FnDataReceiver<WindowedValue<?>>> consumers = 
ArrayListMultimap.create();
     String localOutputId = "outputPC";
     consumers.put(
         localOutputId, (FnDataReceiver) 
(FnDataReceiver<WindowedValue<String>>) outputValues::add);
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
index 0b0fbf598fc..8714a425a12 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
@@ -33,10 +33,10 @@
 import static org.mockito.Mockito.when;
 
 import com.google.common.base.Suppliers;
-import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Multimap;
+import com.google.common.collect.ListMultimap;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -117,7 +117,7 @@ public void setUp() {
   public void testCreatingAndProcessingBeamFnDataWriteRunner() throws 
Exception {
     String bundleId = "57L";
 
-    Multimap<String, FnDataReceiver<WindowedValue<?>>> consumers = 
HashMultimap.create();
+    ListMultimap<String, FnDataReceiver<WindowedValue<?>>> consumers = 
ArrayListMultimap.create();
     List<ThrowingRunnable> startFunctions = new ArrayList<>();
     List<ThrowingRunnable> finishFunctions = new ArrayList<>();
 
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java
index fa8282796a6..e5567ef8abd 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java
@@ -26,10 +26,10 @@
 import static org.junit.Assert.fail;
 
 import com.google.common.base.Suppliers;
-import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Multimap;
+import com.google.common.collect.ListMultimap;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -126,7 +126,7 @@ public void testStart() throws Exception {
   public void testCreatingAndProcessingSourceFromFactory() throws Exception {
     List<WindowedValue<String>> outputValues = new ArrayList<>();
 
-    Multimap<String, FnDataReceiver<WindowedValue<?>>> consumers = 
HashMultimap.create();
+    ListMultimap<String, FnDataReceiver<WindowedValue<?>>> consumers = 
ArrayListMultimap.create();
     consumers.put(
         "outputPC", (FnDataReceiver) (FnDataReceiver<WindowedValue<String>>) 
outputValues::add);
     List<ThrowingRunnable> startFunctions = new ArrayList<>();
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CombineRunnersTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CombineRunnersTest.java
index 5cab09e503d..7ba5c7be5fa 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CombineRunnersTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CombineRunnersTest.java
@@ -24,9 +24,9 @@
 import static org.hamcrest.core.IsEqual.equalTo;
 import static org.junit.Assert.assertThat;
 
-import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Multimap;
+import com.google.common.collect.ListMultimap;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -123,7 +123,7 @@ public void createPipeline() throws Exception {
   @Test
   public void testPrecombine() throws Exception {
     // Create a map of consumers and an output target to check output values.
-    Multimap<String, FnDataReceiver<WindowedValue<?>>> consumers = 
HashMultimap.create();
+    ListMultimap<String, FnDataReceiver<WindowedValue<?>>> consumers = 
ArrayListMultimap.create();
     Deque<WindowedValue<KV<String, Integer>>> mainOutputValues = new 
ArrayDeque<>();
     consumers.put(
         Iterables.getOnlyElement(pTransform.getOutputsMap().values()),
@@ -190,7 +190,7 @@ public void testPrecombine() throws Exception {
   @Test
   public void testMergeAccumulators() throws Exception {
     // Create a map of consumers and an output target to check output values.
-    Multimap<String, FnDataReceiver<WindowedValue<?>>> consumers = 
HashMultimap.create();
+    ListMultimap<String, FnDataReceiver<WindowedValue<?>>> consumers = 
ArrayListMultimap.create();
     Deque<WindowedValue<KV<String, Integer>>> mainOutputValues = new 
ArrayDeque<>();
     consumers.put(
         Iterables.getOnlyElement(pTransform.getOutputsMap().values()),
@@ -245,7 +245,7 @@ public void testMergeAccumulators() throws Exception {
   @Test
   public void testExtractOutputs() throws Exception {
     // Create a map of consumers and an output target to check output values.
-    Multimap<String, FnDataReceiver<WindowedValue<?>>> consumers = 
HashMultimap.create();
+    ListMultimap<String, FnDataReceiver<WindowedValue<?>>> consumers = 
ArrayListMultimap.create();
     Deque<WindowedValue<KV<String, Integer>>> mainOutputValues = new 
ArrayDeque<>();
     consumers.put(
         Iterables.getOnlyElement(pTransform.getOutputsMap().values()),
@@ -300,7 +300,7 @@ public void testExtractOutputs() throws Exception {
   @Test
   public void testCombineGroupedValues() throws Exception {
     // Create a map of consumers and an output target to check output values.
-    Multimap<String, FnDataReceiver<WindowedValue<?>>> consumers = 
HashMultimap.create();
+    ListMultimap<String, FnDataReceiver<WindowedValue<?>>> consumers = 
ArrayListMultimap.create();
     Deque<WindowedValue<KV<String, Integer>>> mainOutputValues = new 
ArrayDeque<>();
     consumers.put(
         Iterables.getOnlyElement(pTransform.getOutputsMap().values()),
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FlattenRunnerTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FlattenRunnerTest.java
index b9393c29e5a..3ba0495d295 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FlattenRunnerTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FlattenRunnerTest.java
@@ -26,10 +26,8 @@
 
 import com.google.common.base.Suppliers;
 import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.ListMultimap;
-import com.google.common.collect.Multimap;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -71,7 +69,7 @@ public void testCreatingAndProcessingDoFlatten() throws 
Exception {
             .build();
 
     List<WindowedValue<String>> mainOutputValues = new ArrayList<>();
-    Multimap<String, FnDataReceiver<WindowedValue<?>>> consumers = 
HashMultimap.create();
+    ListMultimap<String, FnDataReceiver<WindowedValue<?>>> consumers = 
ArrayListMultimap.create();
     consumers.put(
         "mainOutputTarget",
         (FnDataReceiver) (FnDataReceiver<WindowedValue<String>>) 
mainOutputValues::add);
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
index df5d62ce744..5b2ee9ccee9 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.beam.fn.harness;
 
+import static 
org.apache.beam.sdk.util.WindowedValue.timestampedValueInGlobalWindow;
 import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -28,18 +29,20 @@
 import static org.junit.Assert.fail;
 
 import com.google.common.base.Suppliers;
-import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Multimap;
+import com.google.common.collect.ListMultimap;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.ServiceLoader;
 import org.apache.beam.fn.harness.state.FakeBeamFnStateClient;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.PipelineTranslation;
 import org.apache.beam.runners.core.construction.SdkComponents;
@@ -52,7 +55,12 @@
 import org.apache.beam.sdk.state.CombiningState;
 import org.apache.beam.sdk.state.StateSpec;
 import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
 import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.testing.ResetDateTimeProvider;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -76,6 +84,7 @@
 import org.hamcrest.collection.IsMapContaining;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -84,6 +93,8 @@
 @RunWith(JUnit4.class)
 public class FnApiDoFnRunnerTest implements Serializable {
 
+  @Rule public transient ResetDateTimeProvider dateTimeProvider = new 
ResetDateTimeProvider();
+
   public static final String TEST_PTRANSFORM_ID = "pTransformId";
 
   private static class ConcatCombineFn extends CombineFn<String, String, 
String> {
@@ -173,7 +184,7 @@ public void testUsingUserState() throws Exception {
                 bagUserStateKey("combine", "X"), encode("X0")));
 
     List<WindowedValue<String>> mainOutputValues = new ArrayList<>();
-    Multimap<String, FnDataReceiver<WindowedValue<?>>> consumers = 
HashMultimap.create();
+    ListMultimap<String, FnDataReceiver<WindowedValue<?>>> consumers = 
ArrayListMultimap.create();
     consumers.put(
         outputPCollectionId,
         (FnDataReceiver) (FnDataReceiver<WindowedValue<String>>) 
mainOutputValues::add);
@@ -332,7 +343,7 @@ public void testBasicWithSideInputsAndOutputs() throws 
Exception {
 
     List<WindowedValue<String>> mainOutputValues = new ArrayList<>();
     List<WindowedValue<String>> additionalOutputValues = new ArrayList<>();
-    Multimap<String, FnDataReceiver<WindowedValue<?>>> consumers = 
HashMultimap.create();
+    ListMultimap<String, FnDataReceiver<WindowedValue<?>>> consumers = 
ArrayListMultimap.create();
     consumers.put(
         outputPCollectionId,
         (FnDataReceiver) (FnDataReceiver<WindowedValue<String>>) 
mainOutputValues::add);
@@ -459,7 +470,7 @@ public void testSideInputIsAccessibleForDownstreamCallers() 
throws Exception {
     FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient(stateData);
 
     List<WindowedValue<Iterable<String>>> mainOutputValues = new ArrayList<>();
-    Multimap<String, FnDataReceiver<WindowedValue<?>>> consumers = 
HashMultimap.create();
+    ListMultimap<String, FnDataReceiver<WindowedValue<?>>> consumers = 
ArrayListMultimap.create();
     consumers.put(
         Iterables.getOnlyElement(pTransform.getOutputsMap().values()),
         (FnDataReceiver) (FnDataReceiver<WindowedValue<Iterable<String>>>) 
mainOutputValues::add);
@@ -505,10 +516,223 @@ public void 
testSideInputIsAccessibleForDownstreamCallers() throws Exception {
     assertEquals(stateData, fakeClient.getData());
   }
 
+  private static class TestTimerfulDoFn extends DoFn<KV<String, String>, 
String> {
+    @TimerId("event")
+    private final TimerSpec eventTimerSpec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+    @TimerId("processing")
+    private final TimerSpec processingTimerSpec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+    @ProcessElement
+    public void processElement(
+        ProcessContext context,
+        @TimerId("event") Timer eventTimeTimer,
+        @TimerId("processing") Timer processingTimeTimer) {
+      context.output("main" + context.element().getKey());
+      eventTimeTimer.set(context.timestamp().plus(1L));
+      processingTimeTimer.offset(Duration.millis(2L));
+      processingTimeTimer.setRelative();
+    }
+
+    @OnTimer("event")
+    public void eventTimer(
+        OnTimerContext context,
+        @TimerId("event") Timer eventTimeTimer,
+        @TimerId("processing") Timer processingTimeTimer) {
+      context.output("event");
+      eventTimeTimer.set(context.timestamp().plus(11L));
+      processingTimeTimer.offset(Duration.millis(12L));
+      processingTimeTimer.setRelative();
+    }
+
+    @OnTimer("processing")
+    public void processingTimer(
+        OnTimerContext context,
+        @TimerId("event") Timer eventTimeTimer,
+        @TimerId("processing") Timer processingTimeTimer) {
+      context.output("processing");
+      eventTimeTimer.set(context.timestamp().plus(21L));
+      processingTimeTimer.offset(Duration.millis(22L));
+      processingTimeTimer.setRelative();
+    }
+  }
+
+  @Test
+  public void testTimers() throws Exception {
+    dateTimeProvider.setDateTimeFixed(10000L);
+
+    Pipeline p = Pipeline.create();
+    PCollection<KV<String, String>> valuePCollection =
+        p.apply(Create.of(KV.of("unused", "unused")));
+    PCollection<String> outputPCollection =
+        valuePCollection.apply(TEST_PTRANSFORM_ID, ParDo.of(new 
TestTimerfulDoFn()));
+
+    SdkComponents sdkComponents = SdkComponents.create();
+    sdkComponents.registerEnvironment(Environment.getDefaultInstance());
+    // Note that the pipeline translation for timers creates a loop between 
the ParDo with
+    // the timer and the PCollection for that timer. This loop is unrolled by 
runners
+    // during execution which we redo here manually.
+    RunnerApi.Pipeline pProto = PipelineTranslation.toProto(p, sdkComponents);
+    String inputPCollectionId = 
sdkComponents.registerPCollection(valuePCollection);
+    String outputPCollectionId = 
sdkComponents.registerPCollection(outputPCollection);
+    String eventTimerInputPCollectionId = 
"pTransformId/ParMultiDo(TestTimerful).event";
+    String eventTimerOutputPCollectionId = 
"pTransformId/ParMultiDo(TestTimerful).event.output";
+    String processingTimerInputPCollectionId = 
"pTransformId/ParMultiDo(TestTimerful).processing";
+    String processingTimerOutputPCollectionId =
+        "pTransformId/ParMultiDo(TestTimerful).processing.output";
+
+    RunnerApi.PTransform pTransform =
+        pProto
+            .getComponents()
+            .getTransformsOrThrow(
+                
pProto.getComponents().getTransformsOrThrow(TEST_PTRANSFORM_ID).getSubtransforms(0))
+            .toBuilder()
+            // We need to re-write the "output" PCollections that a runner 
would have inserted
+            // on the way to a output sink.
+            .putOutputs("event", eventTimerOutputPCollectionId)
+            .putOutputs("processing", processingTimerOutputPCollectionId)
+            .build();
+
+    FakeBeamFnStateClient fakeClient = new 
FakeBeamFnStateClient(Collections.emptyMap());
+
+    List<WindowedValue<String>> mainOutputValues = new ArrayList<>();
+    List<WindowedValue<KV<String, Timer>>> eventTimerOutputValues = new 
ArrayList<>();
+    List<WindowedValue<KV<String, Timer>>> processingTimerOutputValues = new 
ArrayList<>();
+    ListMultimap<String, FnDataReceiver<WindowedValue<?>>> consumers = 
ArrayListMultimap.create();
+    consumers.put(
+        outputPCollectionId,
+        (FnDataReceiver) (FnDataReceiver<WindowedValue<String>>) 
mainOutputValues::add);
+    consumers.put(
+        eventTimerOutputPCollectionId,
+        (FnDataReceiver)
+            (FnDataReceiver<WindowedValue<KV<String, Timer>>>) 
eventTimerOutputValues::add);
+    consumers.put(
+        processingTimerOutputPCollectionId,
+        (FnDataReceiver)
+            (FnDataReceiver<WindowedValue<KV<String, Timer>>>) 
processingTimerOutputValues::add);
+
+    List<ThrowingRunnable> startFunctions = new ArrayList<>();
+    List<ThrowingRunnable> finishFunctions = new ArrayList<>();
+
+    new FnApiDoFnRunner.Factory<>()
+        .createRunnerForPTransform(
+            PipelineOptionsFactory.create(),
+            null /* beamFnDataClient */,
+            fakeClient,
+            TEST_PTRANSFORM_ID,
+            pTransform,
+            Suppliers.ofInstance("57L")::get,
+            ImmutableMap.<String, RunnerApi.PCollection>builder()
+                .putAll(pProto.getComponents().getPcollectionsMap())
+                // We need to insert the "output" PCollections that a runner 
would have inserted
+                // on the way to a output sink.
+                .put(
+                    eventTimerOutputPCollectionId,
+                    
pProto.getComponents().getPcollectionsOrThrow(eventTimerInputPCollectionId))
+                .put(
+                    processingTimerOutputPCollectionId,
+                    pProto
+                        .getComponents()
+                        
.getPcollectionsOrThrow(processingTimerInputPCollectionId))
+                .build(),
+            pProto.getComponents().getCodersMap(),
+            pProto.getComponents().getWindowingStrategiesMap(),
+            consumers,
+            startFunctions::add,
+            finishFunctions::add,
+            null /* splitListener */);
+
+    Iterables.getOnlyElement(startFunctions).run();
+    mainOutputValues.clear();
+
+    assertThat(
+        consumers.keySet(),
+        containsInAnyOrder(
+            inputPCollectionId,
+            outputPCollectionId,
+            eventTimerInputPCollectionId,
+            eventTimerOutputPCollectionId,
+            processingTimerInputPCollectionId,
+            processingTimerOutputPCollectionId));
+
+    // Ensure that bag user state that is initially empty or populated works.
+    // Ensure that the key order does not matter when we traverse over KV 
pairs.
+    FnDataReceiver<WindowedValue<?>> mainInput =
+        Iterables.getOnlyElement(consumers.get(inputPCollectionId));
+    FnDataReceiver<WindowedValue<?>> eventTimerInput =
+        Iterables.getOnlyElement(consumers.get(eventTimerInputPCollectionId));
+    FnDataReceiver<WindowedValue<?>> processingTimerInput =
+        
Iterables.getOnlyElement(consumers.get(processingTimerInputPCollectionId));
+    mainInput.accept(timestampedValueInGlobalWindow(KV.of("X", "X1"), new 
Instant(1000L)));
+    mainInput.accept(timestampedValueInGlobalWindow(KV.of("Y", "Y1"), new 
Instant(1100L)));
+    mainInput.accept(timestampedValueInGlobalWindow(KV.of("X", "X2"), new 
Instant(1200L)));
+    mainInput.accept(timestampedValueInGlobalWindow(KV.of("Y", "Y2"), new 
Instant(1300L)));
+    eventTimerInput.accept(timerInGlobalWindow("A", new Instant(1400L), new 
Instant(2400L)));
+    eventTimerInput.accept(timerInGlobalWindow("B", new Instant(1500L), new 
Instant(2500L)));
+    eventTimerInput.accept(timerInGlobalWindow("A", new Instant(1600L), new 
Instant(2600L)));
+    processingTimerInput.accept(timerInGlobalWindow("C", new Instant(1700L), 
new Instant(2700L)));
+    processingTimerInput.accept(timerInGlobalWindow("D", new Instant(1800L), 
new Instant(2800L)));
+    processingTimerInput.accept(timerInGlobalWindow("C", new Instant(1900L), 
new Instant(2900L)));
+    assertThat(
+        mainOutputValues,
+        contains(
+            timestampedValueInGlobalWindow("mainX", new Instant(1000L)),
+            timestampedValueInGlobalWindow("mainY", new Instant(1100L)),
+            timestampedValueInGlobalWindow("mainX", new Instant(1200L)),
+            timestampedValueInGlobalWindow("mainY", new Instant(1300L)),
+            timestampedValueInGlobalWindow("event", new Instant(1400L)),
+            timestampedValueInGlobalWindow("event", new Instant(1500L)),
+            timestampedValueInGlobalWindow("event", new Instant(1600L)),
+            timestampedValueInGlobalWindow("processing", new Instant(1700L)),
+            timestampedValueInGlobalWindow("processing", new Instant(1800L)),
+            timestampedValueInGlobalWindow("processing", new Instant(1900L))));
+    assertThat(
+        eventTimerOutputValues,
+        contains(
+            timerInGlobalWindow("X", new Instant(1000L), new Instant(1001L)),
+            timerInGlobalWindow("Y", new Instant(1100L), new Instant(1101L)),
+            timerInGlobalWindow("X", new Instant(1200L), new Instant(1201L)),
+            timerInGlobalWindow("Y", new Instant(1300L), new Instant(1301L)),
+            timerInGlobalWindow("A", new Instant(1400L), new Instant(1411L)),
+            timerInGlobalWindow("B", new Instant(1500L), new Instant(1511L)),
+            timerInGlobalWindow("A", new Instant(1600L), new Instant(1611L)),
+            timerInGlobalWindow("C", new Instant(1700L), new Instant(1721L)),
+            timerInGlobalWindow("D", new Instant(1800L), new Instant(1821L)),
+            timerInGlobalWindow("C", new Instant(1900L), new Instant(1921L))));
+    assertThat(
+        processingTimerOutputValues,
+        contains(
+            timerInGlobalWindow("X", new Instant(1000L), new Instant(10002L)),
+            timerInGlobalWindow("Y", new Instant(1100L), new Instant(10002L)),
+            timerInGlobalWindow("X", new Instant(1200L), new Instant(10002L)),
+            timerInGlobalWindow("Y", new Instant(1300L), new Instant(10002L)),
+            timerInGlobalWindow("A", new Instant(1400L), new Instant(10012L)),
+            timerInGlobalWindow("B", new Instant(1500L), new Instant(10012L)),
+            timerInGlobalWindow("A", new Instant(1600L), new Instant(10012L)),
+            timerInGlobalWindow("C", new Instant(1700L), new Instant(10022L)),
+            timerInGlobalWindow("D", new Instant(1800L), new Instant(10022L)),
+            timerInGlobalWindow("C", new Instant(1900L), new 
Instant(10022L))));
+    mainOutputValues.clear();
+
+    Iterables.getOnlyElement(finishFunctions).run();
+    assertThat(mainOutputValues, empty());
+
+    assertEquals(ImmutableMap.of(), fakeClient.getData());
+    mainOutputValues.clear();
+  }
+
   private <T> WindowedValue<T> valueInWindow(T value, BoundedWindow window) {
     return WindowedValue.of(value, window.maxTimestamp(), window, 
PaneInfo.ON_TIME_AND_ONLY_FIRING);
   }
 
+  private <T>
+      WindowedValue<KV<T, org.apache.beam.runners.core.construction.Timer>> 
timerInGlobalWindow(
+          T value, Instant valueTimestamp, Instant scheduledTimestamp) {
+    return timestampedValueInGlobalWindow(
+        KV.of(value, 
org.apache.beam.runners.core.construction.Timer.of(scheduledTimestamp)),
+        valueTimestamp);
+  }
+
   /**
    * Produces a multimap side input {@link StateKey} for the test PTransform 
id in the global
    * window.
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/MapFnRunnersTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/MapFnRunnersTest.java
index 93ca8086de6..34336c1bb35 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/MapFnRunnersTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/MapFnRunnersTest.java
@@ -25,10 +25,10 @@
 import static org.junit.Assert.assertThat;
 
 import com.google.common.base.Suppliers;
-import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Multimap;
+import com.google.common.collect.ListMultimap;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -62,7 +62,7 @@
   @Test
   public void testValueOnlyMapping() throws Exception {
     List<WindowedValue<?>> outputConsumer = new ArrayList<>();
-    Multimap<String, FnDataReceiver<WindowedValue<?>>> consumers = 
HashMultimap.create();
+    ListMultimap<String, FnDataReceiver<WindowedValue<?>>> consumers = 
ArrayListMultimap.create();
     consumers.put("outputPC", outputConsumer::add);
 
     List<ThrowingRunnable> startFunctions = new ArrayList<>();
@@ -98,7 +98,7 @@ public void testValueOnlyMapping() throws Exception {
   @Test
   public void testFullWindowedValueMapping() throws Exception {
     List<WindowedValue<?>> outputConsumer = new ArrayList<>();
-    Multimap<String, FnDataReceiver<WindowedValue<?>>> consumers = 
HashMultimap.create();
+    ListMultimap<String, FnDataReceiver<WindowedValue<?>>> consumers = 
ArrayListMultimap.create();
     consumers.put("outputPC", outputConsumer::add);
 
     List<ThrowingRunnable> startFunctions = new ArrayList<>();
@@ -133,7 +133,7 @@ public void testFullWindowedValueMapping() throws Exception 
{
   @Test
   public void testFullWindowedValueMappingWithCompressedWindow() throws 
Exception {
     List<WindowedValue<?>> outputConsumer = new ArrayList<>();
-    Multimap<String, FnDataReceiver<WindowedValue<?>>> consumers = 
HashMultimap.create();
+    ListMultimap<String, FnDataReceiver<WindowedValue<?>>> consumers = 
ArrayListMultimap.create();
     consumers.put("outputPC", outputConsumer::add);
 
     List<ThrowingRunnable> startFunctions = new ArrayList<>();
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
index d08eb326ec3..a8f0ff0db52 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
@@ -27,7 +27,7 @@
 import static org.mockito.Mockito.when;
 
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Multimap;
+import com.google.common.collect.ListMultimap;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -365,7 +365,8 @@ public Object createRunnerForPTransform(
                       Map<String, PCollection> pCollections,
                       Map<String, Coder> coders,
                       Map<String, WindowingStrategy> windowingStrategies,
-                      Multimap<String, FnDataReceiver<WindowedValue<?>>> 
pCollectionIdsToConsumers,
+                      ListMultimap<String, FnDataReceiver<WindowedValue<?>>>
+                          pCollectionIdsToConsumers,
                       Consumer<ThrowingRunnable> addStartFunction,
                       Consumer<ThrowingRunnable> addFinishFunction,
                       BundleSplitListener splitListener)
@@ -426,7 +427,8 @@ public Object createRunnerForPTransform(
                       Map<String, PCollection> pCollections,
                       Map<String, Coder> coders,
                       Map<String, WindowingStrategy> windowingStrategies,
-                      Multimap<String, FnDataReceiver<WindowedValue<?>>> 
pCollectionIdsToConsumers,
+                      ListMultimap<String, FnDataReceiver<WindowedValue<?>>>
+                          pCollectionIdsToConsumers,
                       Consumer<ThrowingRunnable> addStartFunction,
                       Consumer<ThrowingRunnable> addFinishFunction,
                       BundleSplitListener splitListener)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 125236)
    Time Spent: 3h 40m  (was: 3.5h)

> Java SDK harness should support user timers
> -------------------------------------------
>
>                 Key: BEAM-4653
>                 URL: https://issues.apache.org/jira/browse/BEAM-4653
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-java-harness
>            Reporter: Luke Cwik
>            Assignee: Luke Cwik
>            Priority: Major
>              Labels: portability
>          Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> Wire up the onTimer method in the Java SDK harness FnApiDoFnRunner connecting 
> it to the RemoteGrpcPort read/write that is responsible for 
> producing/consumer timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to