Repository: beam Updated Branches: refs/heads/master 7e97820c5 -> e1dc7a861
Fix Output Windows in OnTimerContext When a User timer is delivered, output elements produced by that timer firing should be placed within the same window as the timer is in. Deliver timers in the window of thier namespace in the DirectRunner. Test that timer deliveries maintain the window the timer was emitted in. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/75100f8b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/75100f8b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/75100f8b Branch: refs/heads/master Commit: 75100f8bddb957522b4ce1a9e3cc2a4d60b2527c Parents: 7e97820 Author: Thomas Groh <[email protected]> Authored: Tue Mar 21 11:26:09 2017 -0700 Committer: Thomas Groh <[email protected]> Committed: Tue Mar 21 18:03:51 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/core/SimpleDoFnRunner.java | 13 ++++-- .../direct/StatefulParDoEvaluatorFactory.java | 17 +++++--- .../apache/beam/sdk/transforms/ParDoTest.java | 45 ++++++++++++++++++++ 3 files changed, 66 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/75100f8b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 2b93ca0..f5a559c 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -23,6 +23,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import java.util.Collection; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Set; @@ -766,22 +767,26 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out @Override public void output(OutputT output) { - context.outputWithTimestamp(output, timestamp); + context.outputWindowedValue( + output, timestamp(), Collections.singleton(window()), PaneInfo.NO_FIRING); } @Override public void outputWithTimestamp(OutputT output, Instant timestamp) { - context.outputWithTimestamp(output, timestamp); + context.outputWindowedValue( + output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING); } @Override public <T> void sideOutput(TupleTag<T> tag, T output) { - context.sideOutputWithTimestamp(tag, output, timestamp); + context.sideOutputWindowedValue( + tag, output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING); } @Override public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { - context.sideOutputWithTimestamp(tag, output, timestamp); + context.sideOutputWindowedValue( + tag, output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/75100f8b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java index 0ad40ac..77bebb2 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java @@ -17,11 +17,12 @@ */ package org.apache.beam.runners.direct; +import static com.google.common.base.Preconditions.checkState; + import com.google.auto.value.AutoValue; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import java.util.Collections; import java.util.HashMap; @@ -30,6 +31,7 @@ import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItems; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaces; +import org.apache.beam.runners.core.StateNamespaces.WindowNamespace; import org.apache.beam.runners.core.StateTag; import org.apache.beam.runners.core.StateTags; import org.apache.beam.runners.core.TimerInternals.TimerData; @@ -228,15 +230,20 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo @Override public void processElement(WindowedValue<KeyedWorkItem<K, KV<K, InputT>>> gbkResult) throws Exception { - - BoundedWindow window = Iterables.getOnlyElement(gbkResult.getWindows()); - for (WindowedValue<KV<K, InputT>> windowedValue : gbkResult.getValue().elementsIterable()) { delegateEvaluator.processElement(windowedValue); } for (TimerData timer : gbkResult.getValue().timersIterable()) { - delegateEvaluator.onTimer(timer, window); + checkState( + timer.getNamespace() instanceof WindowNamespace, + "Expected Timer %s to be in a %s, but got %s", + timer, + WindowNamespace.class.getSimpleName(), + timer.getNamespace().getClass().getName()); + WindowNamespace<?> windowNamespace = (WindowNamespace) timer.getNamespace(); + BoundedWindow timerWindow = windowNamespace.getWindow(); + delegateEvaluator.onTimer(timer, timerWindow); } } http://git-wip-us.apache.org/repos/asf/beam/blob/75100f8b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index e58f78e..d5786f1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -101,6 +101,7 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.sdk.values.TypeDescriptor; import org.hamcrest.Matchers; import org.joda.time.Duration; import org.joda.time.Instant; @@ -1952,6 +1953,50 @@ public class ParDoTest implements Serializable { pipeline.run(); } + @Test + @Category({RunnableOnService.class, UsesTimersInParDo.class}) + public void testTimerReceivedInOriginalWindow() throws Exception { + final String timerId = "foo"; + + DoFn<KV<String, Integer>, BoundedWindow> fn = + new DoFn<KV<String, Integer>, BoundedWindow>() { + + @TimerId(timerId) + private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @ProcessElement + public void processElement(ProcessContext context, @TimerId(timerId) Timer timer) { + timer.setForNowPlus(Duration.standardSeconds(1)); + } + + @OnTimer(timerId) + public void onTimer(OnTimerContext context, BoundedWindow window) { + context.output(context.window()); + } + + public TypeDescriptor<BoundedWindow> getOutputTypeDescriptor() { + return (TypeDescriptor) TypeDescriptor.of(IntervalWindow.class); + } + }; + + SlidingWindows windowing = + SlidingWindows.of(Duration.standardMinutes(3)).every(Duration.standardMinutes(1)); + PCollection<BoundedWindow> output = + pipeline + .apply(Create.timestamped(TimestampedValue.of(KV.of("hello", 24), new Instant(0L)))) + .apply(Window.<KV<String, Integer>>into(windowing)) + .apply(ParDo.of(fn)); + + PAssert.that(output) + .containsInAnyOrder( + new IntervalWindow(new Instant(0), Duration.standardMinutes(3)), + new IntervalWindow( + new Instant(0).minus(Duration.standardMinutes(1)), Duration.standardMinutes(3)), + new IntervalWindow( + new Instant(0).minus(Duration.standardMinutes(2)), Duration.standardMinutes(3))); + pipeline.run(); + } + /** * Tests that an event time timer set absolutely for the last possible moment fires and results in * supplementary output. The test is otherwise identical to {@link #testEventTimeTimerBounded()}.
