Repository: incubator-apex-core Updated Branches: refs/heads/devel-3 711fd0708 -> e512610ed
APEX-98 #resolve fixed problem with precision in the window to time utility function in WindowGenerator Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/e512610e Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/e512610e Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/e512610e Branch: refs/heads/devel-3 Commit: e512610ed6a84f7a4d0d904bd9e010ac56cf109c Parents: 711fd07 Author: David Yan <[email protected]> Authored: Tue Sep 8 16:06:55 2015 -0700 Committer: David Yan <[email protected]> Committed: Tue Sep 8 18:00:54 2015 -0700 ---------------------------------------------------------------------- .../stram/engine/WindowGenerator.java | 44 +++++++++++++- .../stram/engine/WindowGeneratorTest.java | 62 +++++++++++++++++--- 2 files changed, 96 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/e512610e/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java b/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java index e0ea4d0..83f4790 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java @@ -269,6 +269,34 @@ public class WindowGenerator extends MuxReservoir implements Stream, Runnable return getWindowMillis(windowId, firstWindowMillis, windowWidthMillis) + windowWidthMillis; } + public static long getNextWindowId(long windowId, long firstWindowMillis, long windowWidthMillis) + { + return getAheadWindowId(windowId, firstWindowMillis, windowWidthMillis, 1); + } + + public static long getAheadWindowId(long windowId, long firstWindowMillis, long windowWidthMillis, int ahead) + { + long millis = getWindowMillis(windowId, firstWindowMillis, windowWidthMillis); + millis += ahead * windowWidthMillis; + return getWindowId(millis, firstWindowMillis, windowWidthMillis); + } + + /** + * Returns the number of windows windowIdA is ahead of windowIdB. + * + * @param windowIdA + * @param windowIdB + * @param firstWindowMillis + * @param windowWidthMillis + * @return the number of windows ahead, negative if windowIdA is behind windowIdB + */ + public static long compareWindowId(long windowIdA, long windowIdB, long firstWindowMillis, long windowWidthMillis) + { + long millisA = getWindowMillis(windowIdA, firstWindowMillis, windowWidthMillis); + long millisB = getWindowMillis(windowIdB, firstWindowMillis, windowWidthMillis); + return (millisA - millisB) / windowWidthMillis; + } + /** * @param windowId * @param firstWindowMillis @@ -277,9 +305,19 @@ public class WindowGenerator extends MuxReservoir implements Stream, Runnable */ public static long getWindowMillis(long windowId, long firstWindowMillis, long windowWidthMillis) { - long millis = (windowId >> 32) * 1000 + windowWidthMillis * (windowId & WindowGenerator.WINDOW_MASK); - millis = millis > firstWindowMillis ? millis : firstWindowMillis; - return millis; + if (windowId == -1) { + return firstWindowMillis; + } + long baseMillis = (windowId >> 32) * 1000; + long diff = baseMillis - firstWindowMillis; + long baseChangeInterval = windowWidthMillis * (WindowGenerator.MAX_WINDOW_ID + 1); + long multiplier = diff / baseChangeInterval; + if (diff % baseChangeInterval > 0) { + multiplier++; + } + assert (multiplier >= 0); + windowId = windowId & WindowGenerator.WINDOW_MASK; + return firstWindowMillis + (multiplier * windowWidthMillis * (WindowGenerator.MAX_WINDOW_ID + 1)) + windowId * windowWidthMillis; } private class MasterReservoir extends CircularBuffer<Tuple> implements Reservoir http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/e512610e/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java b/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java index 4665d79..1ef473d 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java @@ -321,16 +321,64 @@ public class WindowGeneratorTest { long first = 1431714014000L; - long time1 = WindowGenerator.getWindowMillis(6149164867354886271L, first, 500); - long time2 = WindowGenerator.getWindowMillis(6149164867354886272L, first, 500); + for (int windowWidthMillis : new int[]{500, 123}) { + long time1 = WindowGenerator.getWindowMillis(6149164867354886271L, first, windowWidthMillis); + long time2 = WindowGenerator.getWindowMillis(6149164867354886272L, first, windowWidthMillis); - long window1 = WindowGenerator.getWindowId(time1, first, 500); - long window2 = WindowGenerator.getWindowId(time2, first, 500); + long window1 = WindowGenerator.getWindowId(time1, first, windowWidthMillis); + long window2 = WindowGenerator.getWindowId(time2, first, windowWidthMillis); - Assert.assertEquals("window 1", 6149164867354886271L, window1); - Assert.assertEquals("window 2", 6149164867354886272L, window2); + Assert.assertEquals("window 1", 6149164867354886271L, window1); + Assert.assertEquals("window 2", 6149164867354886272L, window2); - Assert.assertTrue(time2 > time1); + Assert.assertEquals("window millis difference", windowWidthMillis, time2 - time1); + } + } + + @Test + public void testWindowToTimeBaseSecondRollover() + { + long first = 1431714014123L; + + for (int windowWidthMillis : new int[]{500, 123}) { + long window1 = WindowGenerator.getWindowId(first, first, windowWidthMillis); + window1 |= WindowGenerator.MAX_WINDOW_ID; + long window2 = WindowGenerator.getNextWindowId(window1, first, windowWidthMillis); + Assert.assertTrue("base seconds should be greater during an rollover", (window2 >> 32) > (window1 >> 32)); + long time1 = WindowGenerator.getWindowMillis(window1, first, windowWidthMillis); + long time2 = WindowGenerator.getWindowMillis(window2, first, windowWidthMillis); + + Assert.assertEquals("max window id", WindowGenerator.MAX_WINDOW_ID, window1 & WindowGenerator.WINDOW_MASK); + Assert.assertEquals("rollover after max", 0, window2 & WindowGenerator.WINDOW_MASK); + Assert.assertEquals("window millis difference", windowWidthMillis, time2 - time1); + } + } + + @Test + public void testWindowIdAhead() + { + long first = 1431714014123L; + int ahead = 678; + for (int windowWidthMillis : new int[]{500, 123}) { + long window1 = WindowGenerator.getWindowId(first, first, windowWidthMillis); + long window2 = WindowGenerator.getAheadWindowId(window1, first, windowWidthMillis, ahead); + for (int i = 0; i < ahead; i++) { + window1 = WindowGenerator.getNextWindowId(window1, first, windowWidthMillis); + } + Assert.assertEquals(window2, window1); + } + } + + @Test + public void testWindowIdCompare() + { + long first = 1431714014123L; + int ahead = 341; + for (int windowWidthMillis : new int[]{500, 123}) { + long window1 = WindowGenerator.getWindowId(first, first, windowWidthMillis); + long window2 = WindowGenerator.getAheadWindowId(window1, first, windowWidthMillis, ahead); + Assert.assertEquals(ahead, WindowGenerator.compareWindowId(window2, window1, first, windowWidthMillis)); + } } public static final Logger logger = LoggerFactory.getLogger(WindowGeneratorTest.class);
