Truncate the very last fixed window if it goes beyond representable time
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/296cba00 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/296cba00 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/296cba00 Branch: refs/heads/master Commit: 296cba009a5c979223fb61bd411816169eaad515 Parents: 555ba40 Author: Kenneth Knowles <[email protected]> Authored: Fri Oct 27 10:51:45 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Mon Nov 13 15:03:21 2017 -0800 ---------------------------------------------------------------------- .../beam/runners/core/LateDataUtilsTest.java | 2 +- .../sdk/transforms/windowing/FixedWindows.java | 24 +++++++++++++++++--- .../transforms/windowing/FixedWindowsTest.java | 12 ++++++++++ 3 files changed, 34 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/296cba00/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataUtilsTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataUtilsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataUtilsTest.java index f0f315d..cef865c 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataUtilsTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataUtilsTest.java @@ -64,7 +64,7 @@ public class LateDataUtilsTest { IntervalWindow window = windowFn.assignWindow(new Instant(BoundedWindow.TIMESTAMP_MAX_VALUE)); assertThat( window.maxTimestamp(), - Matchers.<ReadableInstant>greaterThan(GlobalWindow.INSTANCE.maxTimestamp())); + equalTo(GlobalWindow.INSTANCE.maxTimestamp())); assertThat( LateDataUtils.garbageCollectionTime(window, strategy), equalTo(GlobalWindow.INSTANCE.maxTimestamp())); http://git-wip-us.apache.org/repos/asf/beam/blob/296cba00/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java index 8b16916..6c9376c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java @@ -76,9 +76,27 @@ public class FixedWindows extends PartitioningWindowFn<Object, IntervalWindow> { @Override public IntervalWindow assignWindow(Instant timestamp) { - long start = timestamp.getMillis() - - timestamp.plus(size).minus(offset).getMillis() % size.getMillis(); - return new IntervalWindow(new Instant(start), size); + Instant start = + new Instant( + timestamp.getMillis() + - timestamp.plus(size).minus(offset).getMillis() % size.getMillis()); + + + // The global window is inclusive of max timestamp, while interval window excludes its + // upper bound + Instant endOfGlobalWindow = GlobalWindow.INSTANCE.maxTimestamp().plus(1); + + // The end of the window is either start + size if that is within the allowable range, otherwise + // the end of the global window. Truncating the window drives many other + // areas of this system in the appropriate way automatically. + // + // Though it is curious that the very last representable fixed window is shorter than the rest, + // when we are processing data in the year 294247, we'll probably have technology that can + // account for this. + Instant end = + start.isAfter(endOfGlobalWindow.minus(size)) ? endOfGlobalWindow : start.plus(size); + + return new IntervalWindow(start, end); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/296cba00/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/FixedWindowsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/FixedWindowsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/FixedWindowsTest.java index 80a534c..8dc02f9 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/FixedWindowsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/FixedWindowsTest.java @@ -107,6 +107,18 @@ public class FixedWindowsTest { assertThat(mapping.maximumLookback(), equalTo(Duration.ZERO)); } + /** Tests that the last hour of the universe in fact ends at the end of time. */ + @Test + public void testEndOfTime() { + Instant endOfGlobalWindow = GlobalWindow.INSTANCE.maxTimestamp(); + FixedWindows windowFn = FixedWindows.of(Duration.standardHours(1)); + + IntervalWindow truncatedWindow = + windowFn.assignWindow(endOfGlobalWindow.minus(1)); + + assertThat(truncatedWindow.maxTimestamp(), equalTo(endOfGlobalWindow)); + } + @Test public void testDefaultWindowMappingFnGlobalWindow() { PartitioningWindowFn<?, ?> windowFn = FixedWindows.of(Duration.standardMinutes(20L));
