Repository: incubator-beam Updated Branches: refs/heads/master f2d2ce5f4 -> 82ae661c5
Explode windows in DirectRunner's Window.into evaluator Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a3aa4c7f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a3aa4c7f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a3aa4c7f Branch: refs/heads/master Commit: a3aa4c7f1bd54122358c8e41e984a0d0000c160b Parents: edf11fa Author: Kenneth Knowles <[email protected]> Authored: Mon Jun 20 11:37:45 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Tue Jun 21 20:58:31 2016 -0700 ---------------------------------------------------------------------- .../runners/direct/WindowEvaluatorFactory.java | 12 +- .../direct/WindowEvaluatorFactoryTest.java | 174 ++++++++++++------- .../org/apache/beam/sdk/WindowMatchers.java | 80 ++++++++- 3 files changed, 193 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3aa4c7f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java index b07b58a..6045912 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java @@ -82,11 +82,13 @@ class WindowEvaluatorFactory implements TransformEvaluatorFactory { } @Override - public void processElement(WindowedValue<InputT> element) throws Exception { - Collection<? extends BoundedWindow> windows = assignWindows(windowFn, element); - outputBundle.add( - WindowedValue.<InputT>of( - element.getValue(), element.getTimestamp(), windows, PaneInfo.NO_FIRING)); + public void processElement(WindowedValue<InputT> compressedElement) throws Exception { + for (WindowedValue<InputT> element : compressedElement.explodeWindows()) { + Collection<? extends BoundedWindow> windows = assignWindows(windowFn, element); + outputBundle.add( + WindowedValue.<InputT>of( + element.getValue(), element.getTimestamp(), windows, PaneInfo.NO_FIRING)); + } } private <W extends BoundedWindow> Collection<? extends BoundedWindow> assignWindows( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3aa4c7f/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java index 71abcca..c5faa5a 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java @@ -17,6 +17,9 @@ */ package org.apache.beam.runners.direct; +import static org.apache.beam.sdk.WindowMatchers.isSingleWindowedValue; +import static org.apache.beam.sdk.WindowMatchers.isWindowedValue; + import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertThat; import static org.mockito.Mockito.when; @@ -73,23 +76,30 @@ public class WindowEvaluatorFactoryTest { private BundleFactory bundleFactory; - private WindowedValue<Long> first = + private WindowedValue<Long> valueInGlobalWindow = WindowedValue.timestampedValueInGlobalWindow(3L, new Instant(2L)); - private WindowedValue<Long> second = - WindowedValue.of(Long.valueOf(1L), - EPOCH.plus(Duration.standardDays(3)), - ImmutableList.of(GlobalWindow.INSTANCE, - new IntervalWindow(EPOCH, BoundedWindow.TIMESTAMP_MAX_VALUE), - new IntervalWindow(EPOCH.plus(Duration.standardDays(3)), - EPOCH.plus(Duration.standardDays(6)))), - PaneInfo.NO_FIRING); - private WindowedValue<Long> third = + + private WindowedValue<Long> valueInIntervalWindow = WindowedValue.of( Long.valueOf(2L), new Instant(-10L), new IntervalWindow(new Instant(-100), EPOCH), PaneInfo.NO_FIRING); + private IntervalWindow intervalWindow1 = + new IntervalWindow(EPOCH, BoundedWindow.TIMESTAMP_MAX_VALUE); + + private IntervalWindow intervalWindow2 = + new IntervalWindow( + EPOCH.plus(Duration.standardDays(3)), EPOCH.plus(Duration.standardDays(6))); + + private WindowedValue<Long> valueInGlobalAndTwoIntervalWindows = + WindowedValue.of( + Long.valueOf(1L), + EPOCH.plus(Duration.standardDays(3)), + ImmutableList.of(GlobalWindow.INSTANCE, intervalWindow1, intervalWindow2), + PaneInfo.NO_FIRING); + @Before public void setup() { MockitoAnnotations.initMocks(this); @@ -118,7 +128,10 @@ public class WindowEvaluatorFactoryTest { Iterables.getOnlyElement(result.getOutputBundles()), Matchers.<UncommittedBundle<?>>equalTo(outputBundle)); CommittedBundle<Long> committed = outputBundle.commit(Instant.now()); - assertThat(committed.getElements(), containsInAnyOrder(third, first, second)); + assertThat( + committed.getElements(), + containsInAnyOrder( + valueInIntervalWindow, valueInGlobalWindow, valueInGlobalAndTwoIntervalWindows)); } @Test @@ -141,16 +154,22 @@ public class WindowEvaluatorFactoryTest { Matchers.<UncommittedBundle<?>>equalTo(outputBundle)); CommittedBundle<Long> committed = outputBundle.commit(Instant.now()); - WindowedValue<Long> expectedNewFirst = - WindowedValue.of(3L, new Instant(2L), firstSecondWindow, PaneInfo.NO_FIRING); - WindowedValue<Long> expectedNewSecond = - WindowedValue.of( - 1L, EPOCH.plus(Duration.standardDays(3)), firstSecondWindow, PaneInfo.NO_FIRING); - WindowedValue<Long> expectedNewThird = - WindowedValue.of(2L, new Instant(-10L), thirdWindow, PaneInfo.NO_FIRING); assertThat( committed.getElements(), - containsInAnyOrder(expectedNewFirst, expectedNewSecond, expectedNewThird)); + containsInAnyOrder( + // value in global window + isSingleWindowedValue(3L, new Instant(2L), firstSecondWindow, PaneInfo.NO_FIRING), + + // value in just interval window + isSingleWindowedValue(2L, new Instant(-10L), thirdWindow, PaneInfo.NO_FIRING), + + // value in global window and two interval windows + isSingleWindowedValue( + 1L, EPOCH.plus(Duration.standardDays(3)), firstSecondWindow, PaneInfo.NO_FIRING), + isSingleWindowedValue( + 1L, EPOCH.plus(Duration.standardDays(3)), firstSecondWindow, PaneInfo.NO_FIRING), + isSingleWindowedValue( + 1L, EPOCH.plus(Duration.standardDays(3)), firstSecondWindow, PaneInfo.NO_FIRING))); } @Test @@ -177,24 +196,39 @@ public class WindowEvaluatorFactoryTest { BoundedWindow wMinusSlide = new IntervalWindow(EPOCH.minus(windowDuration).plus(slidingBy), EPOCH.plus(slidingBy)); - WindowedValue<Long> expectedFirst = - WindowedValue.of( - first.getValue(), - first.getTimestamp(), - ImmutableSet.of(w1, wMinusSlide), - PaneInfo.NO_FIRING); - WindowedValue<Long> expectedSecond = - WindowedValue.of( - second.getValue(), second.getTimestamp(), ImmutableSet.of(w1, w2), PaneInfo.NO_FIRING); - WindowedValue<Long> expectedThird = - WindowedValue.of( - third.getValue(), - third.getTimestamp(), - ImmutableSet.of(wMinus1, wMinusSlide), - PaneInfo.NO_FIRING); - assertThat( - committed.getElements(), containsInAnyOrder(expectedFirst, expectedSecond, expectedThird)); + committed.getElements(), + containsInAnyOrder( + // Value in global window mapped to one windowed value in multiple windows + isWindowedValue( + valueInGlobalWindow.getValue(), + valueInGlobalWindow.getTimestamp(), + ImmutableSet.of(w1, wMinusSlide), + PaneInfo.NO_FIRING), + + // Value in interval window mapped to one windowed value in multiple windows + isWindowedValue( + valueInIntervalWindow.getValue(), + valueInIntervalWindow.getTimestamp(), + ImmutableSet.of(wMinus1, wMinusSlide), + PaneInfo.NO_FIRING), + + // Value in three windows mapped to three windowed values in the same multiple windows + isWindowedValue( + valueInGlobalAndTwoIntervalWindows.getValue(), + valueInGlobalAndTwoIntervalWindows.getTimestamp(), + ImmutableSet.of(w1, w2), + PaneInfo.NO_FIRING), + isWindowedValue( + valueInGlobalAndTwoIntervalWindows.getValue(), + valueInGlobalAndTwoIntervalWindows.getTimestamp(), + ImmutableSet.of(w1, w2), + PaneInfo.NO_FIRING), + isWindowedValue( + valueInGlobalAndTwoIntervalWindows.getValue(), + valueInGlobalAndTwoIntervalWindows.getTimestamp(), + ImmutableSet.of(w1, w2), + PaneInfo.NO_FIRING))); } @Test @@ -212,34 +246,54 @@ public class WindowEvaluatorFactoryTest { Matchers.<UncommittedBundle<?>>equalTo(outputBundle)); CommittedBundle<Long> committed = outputBundle.commit(Instant.now()); - WindowedValue<Long> expectedFirst = - WindowedValue.of( - first.getValue(), - first.getTimestamp(), - new IntervalWindow(first.getTimestamp(), first.getTimestamp().plus(1L)), - PaneInfo.NO_FIRING); - WindowedValue<Long> expectedSecond = WindowedValue.of(second.getValue(), - second.getTimestamp(), - new IntervalWindow(second.getTimestamp(), second.getTimestamp().plus(1L)), - PaneInfo.NO_FIRING); - WindowedValue<Long> expectedThird = - WindowedValue.of( - third.getValue(), - third.getTimestamp(), - third.getWindows(), - PaneInfo.NO_FIRING); - assertThat( - committed.getElements(), containsInAnyOrder(expectedFirst, expectedSecond, expectedThird)); + committed.getElements(), + containsInAnyOrder( + // Value in global window mapped to [timestamp, timestamp+1) + isSingleWindowedValue( + valueInGlobalWindow.getValue(), + valueInGlobalWindow.getTimestamp(), + new IntervalWindow( + valueInGlobalWindow.getTimestamp(), + valueInGlobalWindow.getTimestamp().plus(1L)), + PaneInfo.NO_FIRING), + + // Value in interval window mapped to the same window + isWindowedValue( + valueInIntervalWindow.getValue(), + valueInIntervalWindow.getTimestamp(), + valueInIntervalWindow.getWindows(), + PaneInfo.NO_FIRING), + + // Value in global window and two interval windows exploded and mapped in both ways + isSingleWindowedValue( + valueInGlobalAndTwoIntervalWindows.getValue(), + valueInGlobalAndTwoIntervalWindows.getTimestamp(), + new IntervalWindow( + valueInGlobalAndTwoIntervalWindows.getTimestamp(), + valueInGlobalAndTwoIntervalWindows.getTimestamp().plus(1L)), + PaneInfo.NO_FIRING), + + isSingleWindowedValue( + valueInGlobalAndTwoIntervalWindows.getValue(), + valueInGlobalAndTwoIntervalWindows.getTimestamp(), + intervalWindow1, + PaneInfo.NO_FIRING), + + isSingleWindowedValue( + valueInGlobalAndTwoIntervalWindows.getValue(), + valueInGlobalAndTwoIntervalWindows.getTimestamp(), + intervalWindow2, + PaneInfo.NO_FIRING))); } private CommittedBundle<Long> createInputBundle() { CommittedBundle<Long> inputBundle = bundleFactory .createRootBundle(input) - .add(first) - .add(second) - .add(third) + .add(valueInGlobalWindow) + .add(valueInGlobalAndTwoIntervalWindows) + .add(valueInIntervalWindow) .commit(Instant.now()); return inputBundle; } @@ -262,9 +316,9 @@ public class WindowEvaluatorFactoryTest { inputBundle, evaluationContext); - evaluator.processElement(first); - evaluator.processElement(second); - evaluator.processElement(third); + evaluator.processElement(valueInGlobalWindow); + evaluator.processElement(valueInGlobalAndTwoIntervalWindows); + evaluator.processElement(valueInIntervalWindow); TransformResult result = evaluator.finishBundle(); return result; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3aa4c7f/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java index 7a5e2fb..48c2589 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java @@ -22,6 +22,8 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; +import com.google.common.collect.Lists; + import org.hamcrest.Description; import org.hamcrest.Matcher; import org.hamcrest.Matchers; @@ -37,19 +39,51 @@ import java.util.Objects; public class WindowMatchers { public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue( - Matcher<? super T> valueMatcher, Matcher<? super Instant> timestampMatcher, + T value, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo paneInfo) { + + Collection<Matcher<? super BoundedWindow>> windowMatchers = + Lists.newArrayListWithCapacity(windows.size()); + for (BoundedWindow window : windows) { + windowMatchers.add(Matchers.equalTo(window)); + } + + return isWindowedValue( + Matchers.equalTo(value), + Matchers.equalTo(timestamp), + Matchers.containsInAnyOrder(windowMatchers), + Matchers.equalTo(paneInfo)); + } + + public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue( + Matcher<? super T> valueMatcher, + Matcher<? super Instant> timestampMatcher, + Matcher<? super Collection<? extends BoundedWindow>> windowsMatcher, + Matcher<? super PaneInfo> paneInfoMatcher) { + return new WindowedValueMatcher<>( + valueMatcher, timestampMatcher, windowsMatcher, paneInfoMatcher); + } + + public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue( + Matcher<? super T> valueMatcher, + Matcher<? super Instant> timestampMatcher, Matcher<? super Collection<? extends BoundedWindow>> windowsMatcher) { - return new WindowedValueMatcher<>(valueMatcher, timestampMatcher, windowsMatcher); + return new WindowedValueMatcher<>( + valueMatcher, timestampMatcher, windowsMatcher, Matchers.anything()); } public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue( Matcher<? super T> valueMatcher, Matcher<? super Instant> timestampMatcher) { - return new WindowedValueMatcher<>(valueMatcher, timestampMatcher, Matchers.anything()); + return new WindowedValueMatcher<>( + valueMatcher, timestampMatcher, Matchers.anything(), Matchers.anything()); } public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue( Matcher<? super T> valueMatcher) { - return new WindowedValueMatcher<>(valueMatcher, Matchers.anything(), Matchers.anything()); + return new WindowedValueMatcher<>( + valueMatcher, Matchers.anything(), Matchers.anything(), Matchers.anything()); } public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue( @@ -59,20 +93,46 @@ public class WindowMatchers { } public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue( + T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo) { + return WindowMatchers.<T>isSingleWindowedValue( + Matchers.equalTo(value), + Matchers.equalTo(timestamp), + Matchers.equalTo(window), + Matchers.equalTo(paneInfo)); + } + + public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue( + T value, Instant timestamp, BoundedWindow window) { + return WindowMatchers.<T>isSingleWindowedValue( + Matchers.equalTo(value), Matchers.equalTo(timestamp), Matchers.equalTo(window)); + } + + public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue( Matcher<T> valueMatcher, long timestamp, long windowStart, long windowEnd) { IntervalWindow intervalWindow = new IntervalWindow(new Instant(windowStart), new Instant(windowEnd)); return WindowMatchers.<T>isSingleWindowedValue( valueMatcher, Matchers.describedAs("%0", Matchers.equalTo(new Instant(timestamp)), timestamp), - Matchers.<BoundedWindow>equalTo(intervalWindow)); + Matchers.<BoundedWindow>equalTo(intervalWindow), + Matchers.anything()); } public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue( - Matcher<? super T> valueMatcher, Matcher<? super Instant> timestampMatcher, + Matcher<? super T> valueMatcher, + Matcher<? super Instant> timestampMatcher, Matcher<? super BoundedWindow> windowMatcher) { return new WindowedValueMatcher<T>( - valueMatcher, timestampMatcher, Matchers.contains(windowMatcher)); + valueMatcher, timestampMatcher, Matchers.contains(windowMatcher), Matchers.anything()); + } + + public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue( + Matcher<? super T> valueMatcher, + Matcher<? super Instant> timestampMatcher, + Matcher<? super BoundedWindow> windowMatcher, + Matcher<? super PaneInfo> paneInfoMatcher) { + return new WindowedValueMatcher<T>( + valueMatcher, timestampMatcher, Matchers.contains(windowMatcher), paneInfoMatcher); } public static Matcher<IntervalWindow> intervalWindow(long start, long end) { @@ -114,14 +174,17 @@ public class WindowMatchers { private Matcher<? super T> valueMatcher; private Matcher<? super Instant> timestampMatcher; private Matcher<? super Collection<? extends BoundedWindow>> windowsMatcher; + private Matcher<? super PaneInfo> paneInfoMatcher; private WindowedValueMatcher( Matcher<? super T> valueMatcher, Matcher<? super Instant> timestampMatcher, - Matcher<? super Collection<? extends BoundedWindow>> windowsMatcher) { + Matcher<? super Collection<? extends BoundedWindow>> windowsMatcher, + Matcher<? super PaneInfo> paneInfoMatcher) { this.valueMatcher = valueMatcher; this.timestampMatcher = timestampMatcher; this.windowsMatcher = windowsMatcher; + this.paneInfoMatcher = paneInfoMatcher; } @Override @@ -130,6 +193,7 @@ public class WindowMatchers { .appendText("a WindowedValue(").appendValue(valueMatcher) .appendText(", ").appendValue(timestampMatcher) .appendText(", ").appendValue(windowsMatcher) + .appendText(", ").appendValue(paneInfoMatcher) .appendText(")"); }
