[BEAM-1149] Explode windows when fn uses side inputs
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a90c4285 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a90c4285 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a90c4285 Branch: refs/heads/master Commit: a90c4285053821d0015f56be52d81bd18994e405 Parents: dad5ba5 Author: Eugene Kirpichov <[email protected]> Authored: Tue Dec 13 14:35:33 2016 -0800 Committer: Kenneth Knowles <[email protected]> Committed: Tue Dec 13 15:06:46 2016 -0800 ---------------------------------------------------------------------- .../core/PushbackSideInputDoFnRunner.java | 23 +++------- .../core/PushbackSideInputDoFnRunnerTest.java | 16 +++---- .../apache/beam/sdk/transforms/ParDoTest.java | 45 ++++++++++++++++++++ 3 files changed, 60 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a90c4285/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java index 460154d..0bb9153 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java @@ -71,32 +71,23 @@ public class PushbackSideInputDoFnRunner<InputT, OutputT> implements DoFnRunner< */ public Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT> elem) { if (views.isEmpty()) { + // When there are no side inputs, we can preserve the compressed representation. processElement(elem); return Collections.emptyList(); } - ImmutableList.Builder<BoundedWindow> readyWindowsBuilder = ImmutableList.builder(); - ImmutableList.Builder<BoundedWindow> pushedBackWindowsBuilder = ImmutableList.builder(); + ImmutableList.Builder<WindowedValue<InputT>> pushedBack = ImmutableList.builder(); for (WindowedValue<InputT> windowElem : elem.explodeWindows()) { BoundedWindow mainInputWindow = Iterables.getOnlyElement(windowElem.getWindows()); if (isReady(mainInputWindow)) { - readyWindowsBuilder.add(mainInputWindow); + // When there are any side inputs, we have to process the element in each window + // individually, to disambiguate access to per-window side inputs. + processElement(windowElem); } else { notReadyWindows.add(mainInputWindow); - pushedBackWindowsBuilder.add(mainInputWindow); + pushedBack.add(windowElem); } } - ImmutableList<BoundedWindow> readyWindows = readyWindowsBuilder.build(); - ImmutableList<BoundedWindow> pushedBackWindows = pushedBackWindowsBuilder.build(); - if (!readyWindows.isEmpty()) { - processElement( - WindowedValue.of( - elem.getValue(), elem.getTimestamp(), readyWindows, elem.getPane())); - } - return pushedBackWindows.isEmpty() - ? ImmutableList.<WindowedValue<InputT>>of() - : ImmutableList.of( - WindowedValue.of( - elem.getValue(), elem.getTimestamp(), pushedBackWindows, elem.getPane())); + return pushedBack.build(); } private boolean isReady(BoundedWindow mainInputWindow) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a90c4285/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java index f8f4604..176ab26 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java @@ -17,9 +17,9 @@ */ package org.apache.beam.runners.core; -import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.emptyIterable; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; import static org.mockito.Mockito.when; @@ -130,7 +130,7 @@ public class PushbackSideInputDoFnRunnerTest { PaneInfo.ON_TIME_AND_ONLY_FIRING); Iterable<WindowedValue<Integer>> multiWindowPushback = runner.processElementInReadyWindows(multiWindow); - assertThat(multiWindowPushback, contains(multiWindow)); + assertThat(multiWindowPushback, equalTo(multiWindow.explodeWindows())); assertThat(underlying.inputElems, Matchers.<WindowedValue<Integer>>emptyIterable()); } @@ -165,10 +165,8 @@ public class PushbackSideInputDoFnRunnerTest { underlying.inputElems, containsInAnyOrder( WindowedValue.of( - 2, - new Instant(-2), - ImmutableList.of(littleWindow, bigWindow), - PaneInfo.NO_FIRING))); + 2, new Instant(-2), ImmutableList.of(littleWindow), PaneInfo.NO_FIRING), + WindowedValue.of(2, new Instant(-2), ImmutableList.of(bigWindow), PaneInfo.NO_FIRING))); } @Test @@ -191,8 +189,9 @@ public class PushbackSideInputDoFnRunnerTest { Iterable<WindowedValue<Integer>> multiWindowPushback = runner.processElementInReadyWindows(multiWindow); assertThat(multiWindowPushback, emptyIterable()); - assertThat(underlying.inputElems, - containsInAnyOrder(ImmutableList.of(multiWindow).toArray())); + assertThat( + underlying.inputElems, + containsInAnyOrder(ImmutableList.copyOf(multiWindow.explodeWindows()).toArray())); } @Test @@ -212,6 +211,7 @@ public class PushbackSideInputDoFnRunnerTest { Iterable<WindowedValue<Integer>> multiWindowPushback = runner.processElementInReadyWindows(multiWindow); assertThat(multiWindowPushback, emptyIterable()); + // Should preserve the compressed representation when there's no side inputs. assertThat(underlying.inputElems, containsInAnyOrder(multiWindow)); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a90c4285/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 2d118e4..4a3e2dd 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -69,6 +69,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.SlidingWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.Timer; @@ -88,6 +89,7 @@ import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.joda.time.Duration; import org.joda.time.Instant; +import org.joda.time.MutableDateTime; import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; @@ -724,6 +726,49 @@ public class ParDoTest implements Serializable { pipeline.run(); } + private static class FnWithSideInputs extends DoFn<String, String> { + private final PCollectionView<Integer> view; + + private FnWithSideInputs(PCollectionView<Integer> view) { + this.view = view; + } + + @ProcessElement + public void processElement(ProcessContext c) { + c.output(c.element() + ":" + c.sideInput(view)); + } + } + + @Test + @Category(RunnableOnService.class) + public void testSideInputsWithMultipleWindows() { + // Tests that the runner can safely run a DoFn that uses side inputs + // on an input where the element is in multiple windows. The complication is + // that side inputs are per-window, so the runner has to make sure + // to process each window individually. + Pipeline p = TestPipeline.create(); + + MutableDateTime mutableNow = Instant.now().toMutableDateTime(); + mutableNow.setMillisOfSecond(0); + Instant now = mutableNow.toInstant(); + + SlidingWindows windowFn = + SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1)); + PCollectionView<Integer> view = p.apply(Create.of(1)).apply(View.<Integer>asSingleton()); + PCollection<String> res = + p.apply(Create.timestamped(TimestampedValue.of("a", now))) + .apply(Window.<String>into(windowFn)) + .apply(ParDo.of(new FnWithSideInputs(view)).withSideInputs(view)); + + for (int i = 0; i < 4; ++i) { + Instant base = now.minus(Duration.standardSeconds(i)); + IntervalWindow window = new IntervalWindow(base, base.plus(Duration.standardSeconds(5))); + PAssert.that(res).inWindow(window).containsInAnyOrder("a:1"); + } + + p.run(); + } + @Test @Category(NeedsRunner.class) public void testParDoWithErrorInStartBatch() {
