Repository: incubator-beam Updated Branches: refs/heads/master 135cb733f -> 0952f4433
Add WindowedValue#explodeWindows This takes an existing WindowedValue and returns a Collection of WindowedValues, each of which is in exactly one window. Use the explode implementation on DoFnRunnerBase Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/98c9d99d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/98c9d99d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/98c9d99d Branch: refs/heads/master Commit: 98c9d99d27224012637e96839aee0721200dc351 Parents: 135cb73 Author: Thomas Groh <[email protected]> Authored: Mon Apr 18 16:55:57 2016 -0700 Committer: bchambers <[email protected]> Committed: Tue Apr 19 12:45:17 2016 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/util/DoFnRunnerBase.java | 5 +- .../org/apache/beam/sdk/util/WindowedValue.java | 13 +++++ .../apache/beam/sdk/util/WindowedValueTest.java | 53 ++++++++++++++++++++ 3 files changed, 68 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/98c9d99d/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java index e9202a2..75861fe 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java @@ -141,9 +141,8 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<Inpu } else { // We could modify the windowed value (and the processContext) to // avoid repeated allocations, but this is more straightforward. - for (BoundedWindow window : elem.getWindows()) { - invokeProcessElement(WindowedValue.of( - elem.getValue(), elem.getTimestamp(), window, elem.getPane())); + for (WindowedValue<InputT> windowedValue : elem.explodeWindows()) { + invokeProcessElement(windowedValue); } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/98c9d99d/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java index f6e82cf..1bbdbd9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java @@ -35,6 +35,7 @@ import org.apache.beam.sdk.util.common.ElementByteSizeObserver; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -175,6 +176,18 @@ public abstract class WindowedValue<T> { public abstract Collection<? extends BoundedWindow> getWindows(); /** + * Returns a collection of {@link WindowedValue WindowedValues} identical to this one, except each + * is in exactly one of the windows that this {@link WindowedValue} is in. + */ + public Iterable<WindowedValue<T>> explodeWindows() { + ImmutableList.Builder<WindowedValue<T>> windowedValues = ImmutableList.builder(); + for (BoundedWindow w : getWindows()) { + windowedValues.add(of(getValue(), getTimestamp(), w, getPane())); + } + return windowedValues.build(); + } + + /** * Returns the pane of this {@code WindowedValue} in its window. */ public PaneInfo getPane() { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/98c9d99d/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java index c2c22c0..90969b7 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java @@ -17,11 +17,21 @@ */ package org.apache.beam.sdk.util; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.emptyIterable; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; import org.joda.time.Instant; import org.junit.Assert; @@ -55,4 +65,47 @@ public class WindowedValueTest { Assert.assertEquals(value.getTimestamp(), decodedValue.getTimestamp()); Assert.assertArrayEquals(value.getWindows().toArray(), decodedValue.getWindows().toArray()); } + + @Test + public void testExplodeWindowsInNoWindowsEmptyIterable() { + WindowedValue<String> value = + WindowedValue.of( + "foo", Instant.now(), ImmutableList.<BoundedWindow>of(), PaneInfo.NO_FIRING); + + assertThat(value.explodeWindows(), emptyIterable()); + } + + @Test + public void testExplodeWindowsInOneWindowEquals() { + Instant now = Instant.now(); + BoundedWindow window = new IntervalWindow(now.minus(1000L), now.plus(1000L)); + WindowedValue<String> value = + WindowedValue.of("foo", now, window, PaneInfo.ON_TIME_AND_ONLY_FIRING); + + assertThat(Iterables.getOnlyElement(value.explodeWindows()), equalTo(value)); + } + + @Test + public void testExplodeWindowsManyWindowsMultipleWindowedValues() { + Instant now = Instant.now(); + BoundedWindow centerWindow = new IntervalWindow(now.minus(1000L), now.plus(1000L)); + BoundedWindow pastWindow = new IntervalWindow(now.minus(1500L), now.plus(500L)); + BoundedWindow futureWindow = new IntervalWindow(now.minus(500L), now.plus(1500L)); + BoundedWindow futureFutureWindow = new IntervalWindow(now, now.plus(2000L)); + PaneInfo pane = PaneInfo.createPane(false, false, Timing.ON_TIME, 3L, 0L); + WindowedValue<String> value = + WindowedValue.of( + "foo", + now, + ImmutableList.of(pastWindow, centerWindow, futureWindow, futureFutureWindow), + pane); + + assertThat( + value.explodeWindows(), + containsInAnyOrder( + WindowedValue.of("foo", now, futureFutureWindow, pane), + WindowedValue.of("foo", now, futureWindow, pane), + WindowedValue.of("foo", now, centerWindow, pane), + WindowedValue.of("foo", now, pastWindow, pane))); + } }
