Add timer support to DoFnRunner(s)
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8af13b01 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8af13b01 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8af13b01 Branch: refs/heads/gearpump-runner Commit: 8af13b0102cda6c68601efa4119723900d12ca5c Parents: c1e1017 Author: Kenneth Knowles <k...@google.com> Authored: Wed Nov 23 14:21:40 2016 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Fri Dec 16 20:14:19 2016 -0800 ---------------------------------------------------------------------- .../apache/beam/runners/core/DoFnRunner.java | 9 + .../core/LateDataDroppingDoFnRunner.java | 7 + .../core/PushbackSideInputDoFnRunner.java | 8 + .../beam/runners/core/SimpleDoFnRunner.java | 236 +++++++++++++++++- .../beam/runners/core/SimpleOldDoFnRunner.java | 8 + .../core/PushbackSideInputDoFnRunnerTest.java | 41 +++ .../beam/runners/core/SimpleDoFnRunnerTest.java | 247 +++++++++++++++++++ 7 files changed, 555 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8af13b01/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java index 501667e..7c73a34 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java @@ -20,8 +20,11 @@ package org.apache.beam.runners.core; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; +import org.joda.time.Instant; /** * An wrapper interface that represents the execution of a {@link DoFn}. @@ -39,6 +42,12 @@ public interface DoFnRunner<InputT, OutputT> { void processElement(WindowedValue<InputT> elem); /** + * Calls a {@link DoFn DoFn's} {@link DoFn.OnTimer @OnTimer} method for the given timer + * in the given window. + */ + void onTimer(String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain); + + /** * Calls a {@link DoFn DoFn's} {@link DoFn.FinishBundle @FinishBundle} method and performs * additional tasks, such as flushing in-memory states. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8af13b01/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java index 9bfe9ae..290171a 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java @@ -24,6 +24,7 @@ import com.google.common.collect.Iterables; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.WindowedValue; @@ -73,6 +74,12 @@ public class LateDataDroppingDoFnRunner<K, InputT, OutputT, W extends BoundedWin } @Override + public void onTimer(String timerId, BoundedWindow window, Instant timestamp, + TimeDomain timeDomain) { + doFnRunner.onTimer(timerId, window, timestamp, timeDomain); + } + + @Override public void finishBundle() { doFnRunner.finishBundle(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8af13b01/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java index 0bb9153..2962832 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java @@ -25,8 +25,10 @@ import java.util.HashSet; import java.util.Set; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; +import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollectionView; +import org.joda.time.Instant; /** * A {@link DoFnRunner} that can refuse to process elements that are not ready, instead returning @@ -109,6 +111,12 @@ public class PushbackSideInputDoFnRunner<InputT, OutputT> implements DoFnRunner< underlying.processElement(elem); } + @Override + public void onTimer(String timerId, BoundedWindow window, Instant timestamp, + TimeDomain timeDomain) { + underlying.onTimer(timerId, window, timestamp, timeDomain); + } + /** * Call the underlying {@link DoFnRunner#finishBundle()}. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8af13b01/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 29ef3ef..a7d82bf 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -50,8 +50,10 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.ExecutionContext.StepContext; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.SystemDoFnInternal; +import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.Timer; import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.TimerSpec; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingInternals; @@ -64,6 +66,7 @@ import org.apache.beam.sdk.util.state.StateSpec; import org.apache.beam.sdk.util.state.StateTags; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Duration; import org.joda.time.Instant; import org.joda.time.format.PeriodFormat; @@ -161,6 +164,35 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out } } + @Override + public void onTimer( + String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) { + + // The effective timestamp is when derived elements will have their timestamp set, if not + // otherwise specified. If this is an event time timer, then they have the timestamp of the + // timer itself. Otherwise, they are set to the input timestamp, which is by definition + // non-late. + Instant effectiveTimestamp; + switch (timeDomain) { + case EVENT_TIME: + effectiveTimestamp = timestamp; + break; + + case PROCESSING_TIME: + case SYNCHRONIZED_PROCESSING_TIME: + effectiveTimestamp = context.stepContext.timerInternals().currentInputWatermarkTime(); + break; + + default: + throw new IllegalArgumentException( + String.format("Unknown time domain: %s", timeDomain)); + } + + OnTimerArgumentProvider<InputT, OutputT> argumentProvider = + new OnTimerArgumentProvider<>(fn, context, window, effectiveTimestamp, timeDomain); + invoker.invokeOnTimer(timerId, argumentProvider); + } + private void invokeProcessElement(WindowedValue<InputT> elem) { final DoFnProcessContext<InputT, OutputT> processContext = createProcessContext(elem); @@ -630,7 +662,13 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out @Override public Timer timer(String timerId) { - throw new UnsupportedOperationException("Timer parameters are not supported."); + try { + TimerSpec spec = + (TimerSpec) signature.timerDeclarations().get(timerId).field().get(fn); + return new TimerInternalsTimer(getNamespace(), timerId, spec, stepContext.timerInternals()); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } } @Override @@ -682,5 +720,201 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out } }; } + + } + + /** + * A concrete implementation of {@link DoFnInvoker.ArgumentProvider} used for running a {@link + * DoFn} on a timer. + * + * @param <InputT> the type of the {@link DoFn} (main) input elements + * @param <OutputT> the type of the {@link DoFn} (main) output elements + */ + private class OnTimerArgumentProvider<InputT, OutputT> + extends DoFn<InputT, OutputT>.OnTimerContext + implements DoFnInvoker.ArgumentProvider<InputT, OutputT> { + + final DoFn<InputT, OutputT> fn; + final DoFnContext<InputT, OutputT> context; + private final BoundedWindow window; + private final Instant timestamp; + private final TimeDomain timeDomain; + + /** Lazily initialized; should only be accessed via {@link #getNamespace()}. */ + private StateNamespace namespace; + + /** + * The state namespace for this context. + * + * <p>Any call to {@link #getNamespace()} when more than one window is present will crash; this + * represents a bug in the runner or the {@link DoFnSignature}, since values must be in exactly + * one window when state or timers are relevant. + */ + private StateNamespace getNamespace() { + if (namespace == null) { + namespace = StateNamespaces.window(windowCoder, window); + } + return namespace; + } + + private OnTimerArgumentProvider( + DoFn<InputT, OutputT> fn, + DoFnContext<InputT, OutputT> context, + BoundedWindow window, + Instant timestamp, + TimeDomain timeDomain) { + fn.super(); + this.fn = fn; + this.context = context; + this.window = window; + this.timestamp = timestamp; + this.timeDomain = timeDomain; + } + + @Override + public Instant timestamp() { + return timestamp; + } + + @Override + public BoundedWindow window() { + return window; + } + + @Override + public TimeDomain timeDomain() { + return timeDomain; + } + + @Override + public Context context(DoFn<InputT, OutputT> doFn) { + throw new UnsupportedOperationException("Context parameters are not supported."); + } + + @Override + public ProcessContext processContext(DoFn<InputT, OutputT> doFn) { + throw new UnsupportedOperationException("ProcessContext parameters are not supported."); + } + + @Override + public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) { + return this; + } + + @Override + public InputProvider<InputT> inputProvider() { + throw new UnsupportedOperationException("InputProvider parameters are not supported."); + } + + @Override + public OutputReceiver<OutputT> outputReceiver() { + throw new UnsupportedOperationException("OutputReceiver parameters are not supported."); + } + + @Override + public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() { + throw new UnsupportedOperationException("RestrictionTracker parameters are not supported."); + } + + @Override + public State state(String stateId) { + try { + StateSpec<?, ?> spec = + (StateSpec<?, ?>) signature.stateDeclarations().get(stateId).field().get(fn); + return stepContext + .stateInternals() + .state(getNamespace(), StateTags.tagForSpec(stateId, (StateSpec) spec)); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + } + + @Override + public Timer timer(String timerId) { + try { + TimerSpec spec = + (TimerSpec) signature.timerDeclarations().get(timerId).field().get(fn); + return new TimerInternalsTimer(getNamespace(), timerId, spec, stepContext.timerInternals()); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + } + + @Override + public PipelineOptions getPipelineOptions() { + return context.getPipelineOptions(); + } + + @Override + public void output(OutputT output) { + context.outputWithTimestamp(output, timestamp); + } + + @Override + public void outputWithTimestamp(OutputT output, Instant timestamp) { + context.outputWithTimestamp(output, timestamp); + } + + @Override + public <T> void sideOutput(TupleTag<T> tag, T output) { + context.sideOutputWithTimestamp(tag, output, timestamp); + } + + @Override + public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { + context.sideOutputWithTimestamp(tag, output, timestamp); + } + + @Override + protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator( + String name, + CombineFn<AggInputT, ?, AggOutputT> combiner) { + throw new UnsupportedOperationException("Cannot createAggregator in @OnTimer method"); + } + + @Override + public WindowingInternals<InputT, OutputT> windowingInternals() { + throw new UnsupportedOperationException("WindowingInternals are unsupported."); + } + } + + private static class TimerInternalsTimer implements Timer { + private final TimerInternals timerInternals; + private final String timerId; + private final TimerSpec spec; + private final StateNamespace namespace; + + public TimerInternalsTimer( + StateNamespace namespace, String timerId, TimerSpec spec, TimerInternals timerInternals) { + this.namespace = namespace; + this.timerId = timerId; + this.spec = spec; + this.timerInternals = timerInternals; + } + + @Override + public void setForNowPlus(Duration durationFromNow) { + timerInternals.setTimer( + namespace, timerId, getCurrentTime().plus(durationFromNow), spec.getTimeDomain()); + } + + @Override + public void cancel() { + timerInternals.deleteTimer(namespace, timerId); + } + + private Instant getCurrentTime() { + switch(spec.getTimeDomain()) { + case EVENT_TIME: + return timerInternals.currentInputWatermarkTime(); + case PROCESSING_TIME: + return timerInternals.currentProcessingTime(); + case SYNCHRONIZED_PROCESSING_TIME: + return timerInternals.currentSynchronizedProcessingTime(); + default: + throw new IllegalStateException( + String.format("Timer created for unknown time domain %s", spec.getTimeDomain())); + } + } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8af13b01/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java index 1048fdc..342a4a8 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java @@ -39,6 +39,7 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.ExecutionContext.StepContext; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.SystemDoFnInternal; +import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; @@ -107,6 +108,13 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT } } + @Override + public void onTimer(String timerId, BoundedWindow window, Instant timestamp, + TimeDomain timeDomain) { + throw new UnsupportedOperationException( + String.format("Timers are not supported by %s", OldDoFn.class.getSimpleName())); + } + private void invokeProcessElement(WindowedValue<InputT> elem) { final OldDoFn<InputT, OutputT>.ProcessContext processContext = createProcessContext(elem); // This can contain user code. Wrap it in case it throws an exception. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8af13b01/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java index 176ab26..a1cdbf6 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.core; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.equalTo; @@ -37,7 +38,10 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.IdentitySideInputWindowFn; import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.state.StateNamespaces; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.hamcrest.Matchers; @@ -215,8 +219,33 @@ public class PushbackSideInputDoFnRunnerTest { assertThat(underlying.inputElems, containsInAnyOrder(multiWindow)); } + /** Tests that a call to onTimer gets delegated. */ + @Test + public void testOnTimerCalled() { + PushbackSideInputDoFnRunner<Integer, Integer> runner = + createRunner(ImmutableList.<PCollectionView<?>>of()); + + String timerId = "fooTimer"; + IntervalWindow window = new IntervalWindow(new Instant(4), new Instant(16)); + Instant timestamp = new Instant(72); + + // Mocking is not easily compatible with annotation analysis, so we manually record + // the method call. + runner.onTimer(timerId, window, new Instant(timestamp), TimeDomain.EVENT_TIME); + + assertThat( + underlying.firedTimers, + contains( + TimerData.of( + timerId, + StateNamespaces.window(IntervalWindow.getCoder(), window), + timestamp, + TimeDomain.EVENT_TIME))); + } + private static class TestDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> { List<WindowedValue<InputT>> inputElems; + List<TimerData> firedTimers; private boolean started = false; private boolean finished = false; @@ -224,6 +253,7 @@ public class PushbackSideInputDoFnRunnerTest { public void startBundle() { started = true; inputElems = new ArrayList<>(); + firedTimers = new ArrayList<>(); } @Override @@ -232,6 +262,17 @@ public class PushbackSideInputDoFnRunnerTest { } @Override + public void onTimer(String timerId, BoundedWindow window, Instant timestamp, + TimeDomain timeDomain) { + firedTimers.add( + TimerData.of( + timerId, + StateNamespaces.window(IntervalWindow.getCoder(), (IntervalWindow) window), + timestamp, + timeDomain)); + } + + @Override public void finishBundle() { finished = true; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8af13b01/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java new file mode 100644 index 0000000..f068c19 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.BaseExecutionContext.StepContext; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.Timer; +import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.TimerInternals.TimerData; +import org.apache.beam.sdk.util.TimerSpec; +import org.apache.beam.sdk.util.TimerSpecs; +import org.apache.beam.sdk.util.UserCodeException; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.state.StateNamespaces; +import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** Tests for {@link SimpleDoFnRunner}. */ +@RunWith(JUnit4.class) +public class SimpleDoFnRunnerTest { + @Rule public ExpectedException thrown = ExpectedException.none(); + + @Mock StepContext mockStepContext; + + @Mock TimerInternals mockTimerInternals; + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + when(mockStepContext.timerInternals()).thenReturn(mockTimerInternals); + } + + @Test + public void testProcessElementExceptionsWrappedAsUserCodeException() { + ThrowingDoFn fn = new ThrowingDoFn(); + DoFnRunner<String, String> runner = + new SimpleDoFnRunner<>( + null, + fn, + null, + null, + null, + Collections.<TupleTag<?>>emptyList(), + mockStepContext, + null, + WindowingStrategy.of(new GlobalWindows())); + + thrown.expect(UserCodeException.class); + thrown.expectCause(is(fn.exceptionToThrow)); + + runner.processElement(WindowedValue.valueInGlobalWindow("anyValue")); + } + + @Test + public void testOnTimerExceptionsWrappedAsUserCodeException() { + ThrowingDoFn fn = new ThrowingDoFn(); + DoFnRunner<String, String> runner = + new SimpleDoFnRunner<>( + null, + fn, + null, + null, + null, + Collections.<TupleTag<?>>emptyList(), + mockStepContext, + null, + WindowingStrategy.of(new GlobalWindows())); + + thrown.expect(UserCodeException.class); + thrown.expectCause(is(fn.exceptionToThrow)); + + runner.onTimer( + ThrowingDoFn.TIMER_ID, + GlobalWindow.INSTANCE, + new Instant(0), + TimeDomain.EVENT_TIME); + } + + /** + * Tests that a users call to set a timer gets properly dispatched to the timer internals. From + * there on, it is the duty of the runner & step context to set it in whatever way is right for + * that runner. + */ + @Test + public void testTimerSet() { + WindowFn<?, ?> windowFn = new GlobalWindows(); + DoFnWithTimers<GlobalWindow> fn = new DoFnWithTimers(windowFn.windowCoder()); + DoFnRunner<String, String> runner = + new SimpleDoFnRunner<>( + null, + fn, + null, + null, + null, + Collections.<TupleTag<?>>emptyList(), + mockStepContext, + null, + WindowingStrategy.of(new GlobalWindows())); + + // Setting the timer needs the current time, as it is set relative + Instant currentTime = new Instant(42); + when(mockTimerInternals.currentInputWatermarkTime()).thenReturn(currentTime); + + runner.processElement(WindowedValue.valueInGlobalWindow("anyValue")); + + verify(mockTimerInternals) + .setTimer( + StateNamespaces.window(new GlobalWindows().windowCoder(), GlobalWindow.INSTANCE), + DoFnWithTimers.TIMER_ID, + currentTime.plus(DoFnWithTimers.TIMER_OFFSET), + TimeDomain.EVENT_TIME); + } + + /** + * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the underlying + * {@link DoFn}. + */ + @Test + public void testOnTimerCalled() { + WindowFn<?, GlobalWindow> windowFn = new GlobalWindows(); + DoFnWithTimers<GlobalWindow> fn = new DoFnWithTimers(windowFn.windowCoder()); + DoFnRunner<String, String> runner = + new SimpleDoFnRunner<>( + null, + fn, + null, + null, + null, + Collections.<TupleTag<?>>emptyList(), + mockStepContext, + null, + WindowingStrategy.of(windowFn)); + + Instant currentTime = new Instant(42); + Duration offset = Duration.millis(37); + + // Mocking is not easily compatible with annotation analysis, so we manually record + // the method call. + runner.onTimer( + DoFnWithTimers.TIMER_ID, + GlobalWindow.INSTANCE, + currentTime.plus(offset), + TimeDomain.EVENT_TIME); + + assertThat( + fn.onTimerInvocations, + contains( + TimerData.of( + DoFnWithTimers.TIMER_ID, + StateNamespaces.window(windowFn.windowCoder(), GlobalWindow.INSTANCE), + currentTime.plus(offset), + TimeDomain.EVENT_TIME))); + } + + static class ThrowingDoFn extends DoFn<String, String> { + final Exception exceptionToThrow = new UnsupportedOperationException("Expected exception"); + + static final String TIMER_ID = "throwingTimerId"; + + @TimerId(TIMER_ID) + private static final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + throw exceptionToThrow; + } + + @OnTimer(TIMER_ID) + public void onTimer(OnTimerContext context) throws Exception { + throw exceptionToThrow; + } + } + + private static class DoFnWithTimers<W extends BoundedWindow> extends DoFn<String, String> { + static final String TIMER_ID = "testTimerId"; + + static final Duration TIMER_OFFSET = Duration.millis(100); + + private final Coder<W> windowCoder; + + // Mutable + List<TimerData> onTimerInvocations; + + DoFnWithTimers(Coder<W> windowCoder) { + this.windowCoder = windowCoder; + this.onTimerInvocations = new ArrayList<>(); + } + + @TimerId(TIMER_ID) + private static final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @ProcessElement + public void process(ProcessContext context, @TimerId(TIMER_ID) Timer timer) { + timer.setForNowPlus(TIMER_OFFSET); + } + + @OnTimer(TIMER_ID) + public void onTimer(OnTimerContext context) { + onTimerInvocations.add( + TimerData.of( + DoFnWithTimers.TIMER_ID, + StateNamespaces.window(windowCoder, (W) context.window()), + context.timestamp(), + context.timeDomain())); + } + } +}