Add WindowFn#assignsToOneWindow
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7fee4b93 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7fee4b93 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7fee4b93 Branch: refs/heads/gearpump-runner Commit: 7fee4b93d5b548d390ab2511a91880b4c5e57a26 Parents: 2365e71 Author: Thomas Groh <[email protected]> Authored: Tue Jun 27 14:23:22 2017 -0700 Committer: Thomas Groh <[email protected]> Committed: Tue Jun 27 15:03:58 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/testing/StaticWindows.java | 5 ++++ .../sdk/transforms/windowing/GlobalWindows.java | 5 ++++ .../windowing/PartitioningWindowFn.java | 5 ++++ .../transforms/windowing/SlidingWindows.java | 5 ++++ .../beam/sdk/transforms/windowing/WindowFn.java | 11 +++++++ .../apache/beam/sdk/util/IdentityWindowFn.java | 5 ++++ .../windowing/SlidingWindowsTest.java | 30 ++++++++++++++++---- 7 files changed, 61 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/7fee4b93/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java index c11057a..eba6978 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java @@ -126,4 +126,9 @@ final class StaticWindows extends NonMergingWindowFn<Object, BoundedWindow> { } }; } + + @Override + public boolean assignsToOneWindow() { + return true; + } } http://git-wip-us.apache.org/repos/asf/beam/blob/7fee4b93/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java index d48d26b..c68c497 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java @@ -79,6 +79,11 @@ public class GlobalWindows extends NonMergingWindowFn<Object, GlobalWindow> { } @Override + public boolean assignsToOneWindow() { + return true; + } + + @Override public boolean equals(Object other) { return other instanceof GlobalWindows; } http://git-wip-us.apache.org/repos/asf/beam/blob/7fee4b93/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java index 40ee68a..341ba27 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java @@ -58,4 +58,9 @@ public abstract class PartitioningWindowFn<T, W extends BoundedWindow> public Instant getOutputTime(Instant inputTimestamp, W window) { return inputTimestamp; } + + @Override + public final boolean assignsToOneWindow() { + return true; + } } http://git-wip-us.apache.org/repos/asf/beam/blob/7fee4b93/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java index f657884..150b956 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java @@ -148,6 +148,11 @@ public class SlidingWindows extends NonMergingWindowFn<Object, IntervalWindow> { } @Override + public boolean assignsToOneWindow() { + return !this.period.isShorterThan(this.size); + } + + @Override public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException { if (!this.isCompatible(other)) { throw new IncompatibleWindowException( http://git-wip-us.apache.org/repos/asf/beam/blob/7fee4b93/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java index 001d630..ffe85f3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java @@ -180,6 +180,17 @@ public abstract class WindowFn<T, W extends BoundedWindow> } /** + * Returns true if this {@link WindowFn} always assigns an element to exactly one window. + * + * <p>If this varies per-element, or cannot be determined, conservatively return false. + * + * <p>By default, returns false. + */ + public boolean assignsToOneWindow() { + return false; + } + + /** * Returns a {@link TypeDescriptor} capturing what is known statically about the window type of * this {@link WindowFn} instance's most-derived class. * http://git-wip-us.apache.org/repos/asf/beam/blob/7fee4b93/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java index a4bfdda..ef6d833 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java @@ -116,4 +116,9 @@ public class IdentityWindowFn<T> extends NonMergingWindowFn<T, BoundedWindow> { public Instant getOutputTime(Instant inputTimestamp, BoundedWindow window) { return inputTimestamp; } + + @Override + public boolean assignsToOneWindow() { + return true; + } } http://git-wip-us.apache.org/repos/asf/beam/blob/7fee4b93/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java index b14e221..bfd01f0 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java @@ -21,6 +21,7 @@ import static org.apache.beam.sdk.testing.WindowFnTestUtils.runWindowFn; import static org.apache.beam.sdk.testing.WindowFnTestUtils.set; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; @@ -55,11 +56,12 @@ public class SlidingWindowsTest { expected.put(new IntervalWindow(new Instant(0), new Instant(10)), set(1, 2, 5, 9)); expected.put(new IntervalWindow(new Instant(5), new Instant(15)), set(5, 9, 10, 11)); expected.put(new IntervalWindow(new Instant(10), new Instant(20)), set(10, 11)); + SlidingWindows windowFn = SlidingWindows.of(new Duration(10)).every(new Duration(5)); assertEquals( expected, - runWindowFn( - SlidingWindows.of(new Duration(10)).every(new Duration(5)), + runWindowFn(windowFn, Arrays.asList(1L, 2L, 5L, 9L, 10L, 11L))); + assertThat(windowFn.assignsToOneWindow(), is(false)); } @Test @@ -69,11 +71,27 @@ public class SlidingWindowsTest { expected.put(new IntervalWindow(new Instant(0), new Instant(7)), set(1, 2, 5)); expected.put(new IntervalWindow(new Instant(5), new Instant(12)), set(5, 9, 10, 11)); expected.put(new IntervalWindow(new Instant(10), new Instant(17)), set(10, 11)); + SlidingWindows windowFn = SlidingWindows.of(new Duration(7)).every(new Duration(5)); assertEquals( expected, - runWindowFn( - SlidingWindows.of(new Duration(7)).every(new Duration(5)), + runWindowFn(windowFn, Arrays.asList(1L, 2L, 5L, 9L, 10L, 11L))); + assertThat(windowFn.assignsToOneWindow(), is(false)); + } + + @Test + public void testEqualSize() throws Exception { + Map<IntervalWindow, Set<String>> expected = new HashMap<>(); + expected.put(new IntervalWindow(new Instant(0), new Instant(3)), set(1, 2)); + expected.put(new IntervalWindow(new Instant(3), new Instant(6)), set(3, 4, 5)); + expected.put(new IntervalWindow(new Instant(6), new Instant(9)), set(6, 7)); + SlidingWindows windowFn = SlidingWindows.of(new Duration(3)).every(new Duration(3)); + assertEquals( + expected, + runWindowFn( + windowFn, + Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L))); + assertThat(windowFn.assignsToOneWindow(), is(true)); } @Test @@ -82,12 +100,14 @@ public class SlidingWindowsTest { expected.put(new IntervalWindow(new Instant(0), new Instant(3)), set(1, 2)); expected.put(new IntervalWindow(new Instant(10), new Instant(13)), set(10, 11)); expected.put(new IntervalWindow(new Instant(100), new Instant(103)), set(100)); + SlidingWindows windowFn = SlidingWindows.of(new Duration(3)).every(new Duration(10)); assertEquals( expected, runWindowFn( // Only look at the first 3 millisecs of every 10-millisec interval. - SlidingWindows.of(new Duration(3)).every(new Duration(10)), + windowFn, Arrays.asList(1L, 2L, 3L, 5L, 9L, 10L, 11L, 100L))); + assertThat(windowFn.assignsToOneWindow(), is(true)); } @Test
