Add timer support to DoFnRunner(s)

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8af13b01
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8af13b01
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8af13b01

Branch: refs/heads/gearpump-runner
Commit: 8af13b0102cda6c68601efa4119723900d12ca5c
Parents: c1e1017
Author: Kenneth Knowles <k...@google.com>
Authored: Wed Nov 23 14:21:40 2016 -0800
Committer: Kenneth Knowles <k...@google.com>
Committed: Fri Dec 16 20:14:19 2016 -0800

----------------------------------------------------------------------
 .../apache/beam/runners/core/DoFnRunner.java    |   9 +
 .../core/LateDataDroppingDoFnRunner.java        |   7 +
 .../core/PushbackSideInputDoFnRunner.java       |   8 +
 .../beam/runners/core/SimpleDoFnRunner.java     | 236 +++++++++++++++++-
 .../beam/runners/core/SimpleOldDoFnRunner.java  |   8 +
 .../core/PushbackSideInputDoFnRunnerTest.java   |  41 +++
 .../beam/runners/core/SimpleDoFnRunnerTest.java | 247 +++++++++++++++++++
 7 files changed, 555 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8af13b01/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
index 501667e..7c73a34 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
@@ -20,8 +20,11 @@ package org.apache.beam.runners.core;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
+import org.joda.time.Instant;
 
 /**
  * An wrapper interface that represents the execution of a {@link DoFn}.
@@ -39,6 +42,12 @@ public interface DoFnRunner<InputT, OutputT> {
   void processElement(WindowedValue<InputT> elem);
 
   /**
+   * Calls a {@link DoFn DoFn's} {@link DoFn.OnTimer @OnTimer} method for the 
given timer
+   * in the given window.
+   */
+  void onTimer(String timerId, BoundedWindow window, Instant timestamp, 
TimeDomain timeDomain);
+
+  /**
    * Calls a {@link DoFn DoFn's} {@link DoFn.FinishBundle @FinishBundle} 
method and performs
    * additional tasks, such as flushing in-memory states.
    */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8af13b01/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
index 9bfe9ae..290171a 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
@@ -24,6 +24,7 @@ import com.google.common.collect.Iterables;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowTracing;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -73,6 +74,12 @@ public class LateDataDroppingDoFnRunner<K, InputT, OutputT, 
W extends BoundedWin
   }
 
   @Override
+  public void onTimer(String timerId, BoundedWindow window, Instant timestamp,
+      TimeDomain timeDomain) {
+    doFnRunner.onTimer(timerId, window, timestamp, timeDomain);
+  }
+
+  @Override
   public void finishBundle() {
     doFnRunner.finishBundle();
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8af13b01/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
index 0bb9153..2962832 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
@@ -25,8 +25,10 @@ import java.util.HashSet;
 import java.util.Set;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
+import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.joda.time.Instant;
 
 /**
  * A {@link DoFnRunner} that can refuse to process elements that are not 
ready, instead returning
@@ -109,6 +111,12 @@ public class PushbackSideInputDoFnRunner<InputT, OutputT> 
implements DoFnRunner<
     underlying.processElement(elem);
   }
 
+  @Override
+  public void onTimer(String timerId, BoundedWindow window, Instant timestamp,
+      TimeDomain timeDomain) {
+    underlying.onTimer(timerId, window, timestamp, timeDomain);
+  }
+
   /**
    * Call the underlying {@link DoFnRunner#finishBundle()}.
    */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8af13b01/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 29ef3ef..a7d82bf 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -50,8 +50,10 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.ExecutionContext.StepContext;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
+import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.Timer;
 import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.TimerSpec;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingInternals;
@@ -64,6 +66,7 @@ import org.apache.beam.sdk.util.state.StateSpec;
 import org.apache.beam.sdk.util.state.StateTags;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.joda.time.format.PeriodFormat;
 
@@ -161,6 +164,35 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
     }
   }
 
+  @Override
+  public void onTimer(
+      String timerId, BoundedWindow window, Instant timestamp, TimeDomain 
timeDomain) {
+
+    // The effective timestamp is when derived elements will have their 
timestamp set, if not
+    // otherwise specified. If this is an event time timer, then they have the 
timestamp of the
+    // timer itself. Otherwise, they are set to the input timestamp, which is 
by definition
+    // non-late.
+    Instant effectiveTimestamp;
+    switch (timeDomain) {
+      case EVENT_TIME:
+        effectiveTimestamp = timestamp;
+        break;
+
+      case PROCESSING_TIME:
+      case SYNCHRONIZED_PROCESSING_TIME:
+        effectiveTimestamp = 
context.stepContext.timerInternals().currentInputWatermarkTime();
+        break;
+
+      default:
+        throw new IllegalArgumentException(
+            String.format("Unknown time domain: %s", timeDomain));
+    }
+
+    OnTimerArgumentProvider<InputT, OutputT> argumentProvider =
+        new OnTimerArgumentProvider<>(fn, context, window, effectiveTimestamp, 
timeDomain);
+    invoker.invokeOnTimer(timerId, argumentProvider);
+  }
+
   private void invokeProcessElement(WindowedValue<InputT> elem) {
     final DoFnProcessContext<InputT, OutputT> processContext = 
createProcessContext(elem);
 
@@ -630,7 +662,13 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
 
     @Override
     public Timer timer(String timerId) {
-      throw new UnsupportedOperationException("Timer parameters are not 
supported.");
+      try {
+        TimerSpec spec =
+            (TimerSpec) 
signature.timerDeclarations().get(timerId).field().get(fn);
+        return new TimerInternalsTimer(getNamespace(), timerId, spec, 
stepContext.timerInternals());
+      } catch (IllegalAccessException e) {
+        throw new RuntimeException(e);
+      }
     }
 
     @Override
@@ -682,5 +720,201 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
         }
       };
     }
+
+  }
+
+  /**
+   * A concrete implementation of {@link DoFnInvoker.ArgumentProvider} used 
for running a {@link
+   * DoFn} on a timer.
+   *
+   * @param <InputT> the type of the {@link DoFn} (main) input elements
+   * @param <OutputT> the type of the {@link DoFn} (main) output elements
+   */
+  private class OnTimerArgumentProvider<InputT, OutputT>
+      extends DoFn<InputT, OutputT>.OnTimerContext
+      implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
+
+    final DoFn<InputT, OutputT> fn;
+    final DoFnContext<InputT, OutputT> context;
+    private final BoundedWindow window;
+    private final Instant timestamp;
+    private final TimeDomain timeDomain;
+
+    /** Lazily initialized; should only be accessed via {@link 
#getNamespace()}. */
+    private StateNamespace namespace;
+
+    /**
+     * The state namespace for this context.
+     *
+     * <p>Any call to {@link #getNamespace()} when more than one window is 
present will crash; this
+     * represents a bug in the runner or the {@link DoFnSignature}, since 
values must be in exactly
+     * one window when state or timers are relevant.
+     */
+    private StateNamespace getNamespace() {
+      if (namespace == null) {
+        namespace = StateNamespaces.window(windowCoder, window);
+      }
+      return namespace;
+    }
+
+    private OnTimerArgumentProvider(
+        DoFn<InputT, OutputT> fn,
+        DoFnContext<InputT, OutputT> context,
+        BoundedWindow window,
+        Instant timestamp,
+        TimeDomain timeDomain) {
+      fn.super();
+      this.fn = fn;
+      this.context = context;
+      this.window = window;
+      this.timestamp = timestamp;
+      this.timeDomain = timeDomain;
+    }
+
+    @Override
+    public Instant timestamp() {
+      return timestamp;
+    }
+
+    @Override
+    public BoundedWindow window() {
+      return window;
+    }
+
+    @Override
+    public TimeDomain timeDomain() {
+      return timeDomain;
+    }
+
+    @Override
+    public Context context(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException("Context parameters are not 
supported.");
+    }
+
+    @Override
+    public ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException("ProcessContext parameters are 
not supported.");
+    }
+
+    @Override
+    public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
+      return this;
+    }
+
+    @Override
+    public InputProvider<InputT> inputProvider() {
+      throw new UnsupportedOperationException("InputProvider parameters are 
not supported.");
+    }
+
+    @Override
+    public OutputReceiver<OutputT> outputReceiver() {
+      throw new UnsupportedOperationException("OutputReceiver parameters are 
not supported.");
+    }
+
+    @Override
+    public <RestrictionT> RestrictionTracker<RestrictionT> 
restrictionTracker() {
+      throw new UnsupportedOperationException("RestrictionTracker parameters 
are not supported.");
+    }
+
+    @Override
+    public State state(String stateId) {
+      try {
+        StateSpec<?, ?> spec =
+            (StateSpec<?, ?>) 
signature.stateDeclarations().get(stateId).field().get(fn);
+        return stepContext
+            .stateInternals()
+            .state(getNamespace(), StateTags.tagForSpec(stateId, (StateSpec) 
spec));
+      } catch (IllegalAccessException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public Timer timer(String timerId) {
+      try {
+        TimerSpec spec =
+            (TimerSpec) 
signature.timerDeclarations().get(timerId).field().get(fn);
+        return new TimerInternalsTimer(getNamespace(), timerId, spec, 
stepContext.timerInternals());
+      } catch (IllegalAccessException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return context.getPipelineOptions();
+    }
+
+    @Override
+    public void output(OutputT output) {
+      context.outputWithTimestamp(output, timestamp);
+    }
+
+    @Override
+    public void outputWithTimestamp(OutputT output, Instant timestamp) {
+      context.outputWithTimestamp(output, timestamp);
+    }
+
+    @Override
+    public <T> void sideOutput(TupleTag<T> tag, T output) {
+      context.sideOutputWithTimestamp(tag, output, timestamp);
+    }
+
+    @Override
+    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant 
timestamp) {
+      context.sideOutputWithTimestamp(tag, output, timestamp);
+    }
+
+    @Override
+    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> 
createAggregator(
+        String name,
+        CombineFn<AggInputT, ?, AggOutputT> combiner) {
+      throw new UnsupportedOperationException("Cannot createAggregator in 
@OnTimer method");
+    }
+
+    @Override
+    public WindowingInternals<InputT, OutputT> windowingInternals() {
+      throw new UnsupportedOperationException("WindowingInternals are 
unsupported.");
+    }
+  }
+
+  private static class TimerInternalsTimer implements Timer {
+    private final TimerInternals timerInternals;
+    private final String timerId;
+    private final TimerSpec spec;
+    private final StateNamespace namespace;
+
+    public TimerInternalsTimer(
+        StateNamespace namespace, String timerId, TimerSpec spec, 
TimerInternals timerInternals) {
+      this.namespace = namespace;
+      this.timerId = timerId;
+      this.spec = spec;
+      this.timerInternals = timerInternals;
+    }
+
+    @Override
+    public void setForNowPlus(Duration durationFromNow) {
+      timerInternals.setTimer(
+          namespace, timerId, getCurrentTime().plus(durationFromNow), 
spec.getTimeDomain());
+    }
+
+    @Override
+    public void cancel() {
+      timerInternals.deleteTimer(namespace, timerId);
+    }
+
+    private Instant getCurrentTime() {
+      switch(spec.getTimeDomain()) {
+        case EVENT_TIME:
+          return timerInternals.currentInputWatermarkTime();
+        case PROCESSING_TIME:
+          return timerInternals.currentProcessingTime();
+        case SYNCHRONIZED_PROCESSING_TIME:
+          return timerInternals.currentSynchronizedProcessingTime();
+        default:
+          throw new IllegalStateException(
+              String.format("Timer created for unknown time domain %s", 
spec.getTimeDomain()));
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8af13b01/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
index 1048fdc..342a4a8 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
@@ -39,6 +39,7 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.ExecutionContext.StepContext;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
+import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -107,6 +108,13 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, OutputT
     }
   }
 
+  @Override
+  public void onTimer(String timerId, BoundedWindow window, Instant timestamp,
+      TimeDomain timeDomain) {
+    throw new UnsupportedOperationException(
+        String.format("Timers are not supported by %s", 
OldDoFn.class.getSimpleName()));
+  }
+
   private void invokeProcessElement(WindowedValue<InputT> elem) {
     final OldDoFn<InputT, OutputT>.ProcessContext processContext = 
createProcessContext(elem);
     // This can contain user code. Wrap it in case it throws an exception.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8af13b01/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
index 176ab26..a1cdbf6 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.core;
 
+import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.emptyIterable;
 import static org.hamcrest.Matchers.equalTo;
@@ -37,7 +38,10 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.IdentitySideInputWindowFn;
 import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.state.StateNamespaces;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.hamcrest.Matchers;
@@ -215,8 +219,33 @@ public class PushbackSideInputDoFnRunnerTest {
     assertThat(underlying.inputElems, containsInAnyOrder(multiWindow));
   }
 
+  /** Tests that a call to onTimer gets delegated. */
+  @Test
+  public void testOnTimerCalled() {
+    PushbackSideInputDoFnRunner<Integer, Integer> runner =
+        createRunner(ImmutableList.<PCollectionView<?>>of());
+
+    String timerId = "fooTimer";
+    IntervalWindow window = new IntervalWindow(new Instant(4), new 
Instant(16));
+    Instant timestamp = new Instant(72);
+
+    // Mocking is not easily compatible with annotation analysis, so we 
manually record
+    // the method call.
+    runner.onTimer(timerId, window, new Instant(timestamp), 
TimeDomain.EVENT_TIME);
+
+    assertThat(
+        underlying.firedTimers,
+        contains(
+            TimerData.of(
+                timerId,
+                StateNamespaces.window(IntervalWindow.getCoder(), window),
+                timestamp,
+                TimeDomain.EVENT_TIME)));
+  }
+
   private static class TestDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, OutputT> {
     List<WindowedValue<InputT>> inputElems;
+    List<TimerData> firedTimers;
     private boolean started = false;
     private boolean finished = false;
 
@@ -224,6 +253,7 @@ public class PushbackSideInputDoFnRunnerTest {
     public void startBundle() {
       started = true;
       inputElems = new ArrayList<>();
+      firedTimers = new ArrayList<>();
     }
 
     @Override
@@ -232,6 +262,17 @@ public class PushbackSideInputDoFnRunnerTest {
     }
 
     @Override
+    public void onTimer(String timerId, BoundedWindow window, Instant 
timestamp,
+        TimeDomain timeDomain) {
+      firedTimers.add(
+          TimerData.of(
+              timerId,
+              StateNamespaces.window(IntervalWindow.getCoder(), 
(IntervalWindow) window),
+              timestamp,
+              timeDomain));
+    }
+
+    @Override
     public void finishBundle() {
       finished = true;
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8af13b01/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
new file mode 100644
index 0000000..f068c19
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.BaseExecutionContext.StepContext;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.Timer;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.TimerSpec;
+import org.apache.beam.sdk.util.TimerSpecs;
+import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.StateNamespaces;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link SimpleDoFnRunner}. */
+@RunWith(JUnit4.class)
+public class SimpleDoFnRunnerTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Mock StepContext mockStepContext;
+
+  @Mock TimerInternals mockTimerInternals;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    when(mockStepContext.timerInternals()).thenReturn(mockTimerInternals);
+  }
+
+  @Test
+  public void testProcessElementExceptionsWrappedAsUserCodeException() {
+    ThrowingDoFn fn = new ThrowingDoFn();
+    DoFnRunner<String, String> runner =
+        new SimpleDoFnRunner<>(
+            null,
+            fn,
+            null,
+            null,
+            null,
+            Collections.<TupleTag<?>>emptyList(),
+            mockStepContext,
+            null,
+            WindowingStrategy.of(new GlobalWindows()));
+
+    thrown.expect(UserCodeException.class);
+    thrown.expectCause(is(fn.exceptionToThrow));
+
+    runner.processElement(WindowedValue.valueInGlobalWindow("anyValue"));
+  }
+
+  @Test
+  public void testOnTimerExceptionsWrappedAsUserCodeException() {
+    ThrowingDoFn fn = new ThrowingDoFn();
+    DoFnRunner<String, String> runner =
+        new SimpleDoFnRunner<>(
+            null,
+            fn,
+            null,
+            null,
+            null,
+            Collections.<TupleTag<?>>emptyList(),
+            mockStepContext,
+            null,
+            WindowingStrategy.of(new GlobalWindows()));
+
+    thrown.expect(UserCodeException.class);
+    thrown.expectCause(is(fn.exceptionToThrow));
+
+    runner.onTimer(
+        ThrowingDoFn.TIMER_ID,
+        GlobalWindow.INSTANCE,
+        new Instant(0),
+        TimeDomain.EVENT_TIME);
+  }
+
+  /**
+   * Tests that a users call to set a timer gets properly dispatched to the 
timer internals. From
+   * there on, it is the duty of the runner & step context to set it in 
whatever way is right for
+   * that runner.
+   */
+  @Test
+  public void testTimerSet() {
+    WindowFn<?, ?> windowFn = new GlobalWindows();
+    DoFnWithTimers<GlobalWindow> fn = new 
DoFnWithTimers(windowFn.windowCoder());
+    DoFnRunner<String, String> runner =
+        new SimpleDoFnRunner<>(
+            null,
+            fn,
+            null,
+            null,
+            null,
+            Collections.<TupleTag<?>>emptyList(),
+            mockStepContext,
+            null,
+            WindowingStrategy.of(new GlobalWindows()));
+
+    // Setting the timer needs the current time, as it is set relative
+    Instant currentTime = new Instant(42);
+    
when(mockTimerInternals.currentInputWatermarkTime()).thenReturn(currentTime);
+
+    runner.processElement(WindowedValue.valueInGlobalWindow("anyValue"));
+
+    verify(mockTimerInternals)
+        .setTimer(
+            StateNamespaces.window(new GlobalWindows().windowCoder(), 
GlobalWindow.INSTANCE),
+            DoFnWithTimers.TIMER_ID,
+            currentTime.plus(DoFnWithTimers.TIMER_OFFSET),
+            TimeDomain.EVENT_TIME);
+  }
+
+  /**
+   * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the 
underlying
+   * {@link DoFn}.
+   */
+  @Test
+  public void testOnTimerCalled() {
+    WindowFn<?, GlobalWindow> windowFn = new GlobalWindows();
+    DoFnWithTimers<GlobalWindow> fn = new 
DoFnWithTimers(windowFn.windowCoder());
+    DoFnRunner<String, String> runner =
+        new SimpleDoFnRunner<>(
+            null,
+            fn,
+            null,
+            null,
+            null,
+            Collections.<TupleTag<?>>emptyList(),
+            mockStepContext,
+            null,
+            WindowingStrategy.of(windowFn));
+
+    Instant currentTime = new Instant(42);
+    Duration offset = Duration.millis(37);
+
+    // Mocking is not easily compatible with annotation analysis, so we 
manually record
+    // the method call.
+    runner.onTimer(
+        DoFnWithTimers.TIMER_ID,
+        GlobalWindow.INSTANCE,
+        currentTime.plus(offset),
+        TimeDomain.EVENT_TIME);
+
+    assertThat(
+        fn.onTimerInvocations,
+        contains(
+            TimerData.of(
+                DoFnWithTimers.TIMER_ID,
+                StateNamespaces.window(windowFn.windowCoder(), 
GlobalWindow.INSTANCE),
+                currentTime.plus(offset),
+                TimeDomain.EVENT_TIME)));
+  }
+
+  static class ThrowingDoFn extends DoFn<String, String> {
+    final Exception exceptionToThrow = new 
UnsupportedOperationException("Expected exception");
+
+    static final String TIMER_ID = "throwingTimerId";
+
+    @TimerId(TIMER_ID)
+    private static final TimerSpec timer = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
+      throw exceptionToThrow;
+    }
+
+    @OnTimer(TIMER_ID)
+    public void onTimer(OnTimerContext context) throws Exception {
+      throw exceptionToThrow;
+    }
+  }
+
+  private static class DoFnWithTimers<W extends BoundedWindow> extends 
DoFn<String, String> {
+    static final String TIMER_ID = "testTimerId";
+
+    static final Duration TIMER_OFFSET = Duration.millis(100);
+
+    private final Coder<W> windowCoder;
+
+    // Mutable
+    List<TimerData> onTimerInvocations;
+
+    DoFnWithTimers(Coder<W> windowCoder) {
+      this.windowCoder = windowCoder;
+      this.onTimerInvocations = new ArrayList<>();
+    }
+
+    @TimerId(TIMER_ID)
+    private static final TimerSpec timer = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+    @ProcessElement
+    public void process(ProcessContext context, @TimerId(TIMER_ID) Timer 
timer) {
+      timer.setForNowPlus(TIMER_OFFSET);
+    }
+
+    @OnTimer(TIMER_ID)
+    public void onTimer(OnTimerContext context) {
+      onTimerInvocations.add(
+          TimerData.of(
+              DoFnWithTimers.TIMER_ID,
+              StateNamespaces.window(windowCoder, (W) context.window()),
+              context.timestamp(),
+              context.timeDomain()));
+    }
+  }
+}

Reply via email to