APEXMALHAR-2305 #resolve Mirror the proto-session window behavior described in the streaming 102 blog
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/5131bee0 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/5131bee0 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/5131bee0 Branch: refs/heads/master Commit: 5131bee0ed0c519294d87f2c460556d0a63f284e Parents: 2f308aa Author: David Yan <[email protected]> Authored: Tue Oct 18 15:41:28 2016 -0700 Committer: Siyuan Hua <[email protected]> Committed: Wed Oct 19 12:36:20 2016 -0700 ---------------------------------------------------------------------- .../apex/malhar/lib/window/SessionWindowedStorage.java | 6 +++--- .../lib/window/impl/InMemorySessionWindowedStorage.java | 2 +- .../lib/window/impl/KeyedWindowedOperatorImpl.java | 11 ++++++----- .../lib/window/impl/SpillableSessionWindowedStorage.java | 2 +- .../apex/malhar/lib/window/WindowedOperatorTest.java | 8 ++++---- 5 files changed, 15 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5131bee0/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java index 3e25d15..b2accbb 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java @@ -44,10 +44,10 @@ public interface SessionWindowedStorage<K, V> extends WindowedStorage.WindowedKe void migrateWindow(Window.SessionWindow<K> fromWindow, Window.SessionWindow<K> toWindow); /** - * Given the key, the timestamp and the gap, gets the data that falls into timestamp +/- gap. - * This is used for getting the entry the data given the timestamp belongs to, and for determining whether to merge + * Given the key, the timestamp and the gap, gets the windows that overlaps with timestamp to (timestamp + gap). + * This is used for getting the windows the timestamp belongs to, and for determining whether to merge * session windows. - * This should only return at most two entries if sessions have been merged appropriately. + * This should only return at most two windows if sessions have been merged appropriately. * * @param key the key * @param timestamp the timestamp http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5131bee0/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java index 906b1b9..0247cbc 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java @@ -73,7 +73,7 @@ public class InMemorySessionWindowedStorage<K, V> extends InMemoryWindowedKeyedS Window.SessionWindow<K> refWindow = new Window.SessionWindow<>(key, timestamp, 1); Window.SessionWindow<K> floor = sessionWindows.floor(refWindow); if (floor != null) { - if (floor.getBeginTimestamp() + floor.getDurationMillis() + gap > timestamp) { + if (floor.getBeginTimestamp() + floor.getDurationMillis() > timestamp) { results.add(new AbstractMap.SimpleEntry<>(floor, map.get(floor).get(key))); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5131bee0/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java index a33133b..b01fe61 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java @@ -60,12 +60,13 @@ public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT> KeyT key = ((KeyValPair<KeyT, ?>)inputTuple.getValue()).getKey(); WindowOption.SessionWindows sessionWindowOption = (WindowOption.SessionWindows)windowOption; SessionWindowedStorage<KeyT, AccumT> sessionStorage = (SessionWindowedStorage<KeyT, AccumT>)dataStorage; - Collection<Map.Entry<Window.SessionWindow<KeyT>, AccumT>> sessionEntries = sessionStorage.getSessionEntries(key, timestamp, sessionWindowOption.getMinGap().getMillis()); + long minGapMillis = sessionWindowOption.getMinGap().getMillis(); + Collection<Map.Entry<Window.SessionWindow<KeyT>, AccumT>> sessionEntries = sessionStorage.getSessionEntries(key, timestamp, minGapMillis); Window.SessionWindow<KeyT> sessionWindowToAssign; switch (sessionEntries.size()) { case 0: { // There are no existing windows within the minimum gap. Create a new session window - Window.SessionWindow<KeyT> sessionWindow = new Window.SessionWindow<>(key, timestamp, 1); + Window.SessionWindow<KeyT> sessionWindow = new Window.SessionWindow<>(key, timestamp, minGapMillis); windowStateMap.put(sessionWindow, new WindowState()); sessionWindowToAssign = sessionWindow; break; @@ -74,7 +75,7 @@ public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT> // There is already one existing window within the minimum gap. See whether we need to extend the time of that window Map.Entry<Window.SessionWindow<KeyT>, AccumT> sessionWindowEntry = sessionEntries.iterator().next(); Window.SessionWindow<KeyT> sessionWindow = sessionWindowEntry.getKey(); - if (sessionWindow.getBeginTimestamp() <= timestamp && timestamp < sessionWindow.getBeginTimestamp() + sessionWindow.getDurationMillis()) { + if (sessionWindow.getBeginTimestamp() <= timestamp && timestamp + minGapMillis <= sessionWindow.getBeginTimestamp() + sessionWindow.getDurationMillis()) { // The session window already covers the event sessionWindowToAssign = sessionWindow; } else { @@ -86,7 +87,7 @@ public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT> } // create a new session window that covers the timestamp long newBeginTimestamp = Math.min(sessionWindow.getBeginTimestamp(), timestamp); - long newEndTimestamp = Math.max(sessionWindow.getBeginTimestamp() + sessionWindow.getDurationMillis(), timestamp + 1); + long newEndTimestamp = Math.max(sessionWindow.getBeginTimestamp() + sessionWindow.getDurationMillis(), timestamp + minGapMillis); Window.SessionWindow<KeyT> newSessionWindow = new Window.SessionWindow<>(key, newBeginTimestamp, newEndTimestamp - newBeginTimestamp); windowStateMap.remove(sessionWindow); @@ -97,7 +98,7 @@ public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT> break; } case 2: { - // There are two windows that fall within the minimum gap of the timestamp. We need to merge the two windows + // There are two windows that overlap the proto-session window of the timestamp. We need to merge the two windows Iterator<Map.Entry<Window.SessionWindow<KeyT>, AccumT>> iterator = sessionEntries.iterator(); Map.Entry<Window.SessionWindow<KeyT>, AccumT> sessionWindowEntry1 = iterator.next(); Map.Entry<Window.SessionWindow<KeyT>, AccumT> sessionWindowEntry2 = iterator.next(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5131bee0/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java index 8779739..5d39930 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java @@ -106,7 +106,7 @@ public class SpillableSessionWindowedStorage<K, V> extends SpillableWindowedKeye if (sessionWindows != null) { for (Window.SessionWindow<K> window : sessionWindows) { if (timestamp > window.getBeginTimestamp()) { - if (window.getBeginTimestamp() + window.getDurationMillis() + gap > timestamp) { + if (window.getBeginTimestamp() + window.getDurationMillis() > timestamp) { results.add(new AbstractMap.SimpleEntry<>(window, windowKeyToValueMap.get(new ImmutablePair<Window, K>(window, key)))); } } else if (timestamp < window.getBeginTimestamp()) { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5131bee0/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java index 15aba82..4a1cef0 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java @@ -438,7 +438,7 @@ public class WindowedOperatorTest Assert.assertEquals(1, out.getWindows().size()); Window.SessionWindow<String> window1 = (Window.SessionWindow<String>)out.getWindows().iterator().next(); Assert.assertEquals(1100L, window1.getBeginTimestamp()); - Assert.assertEquals(1, window1.getDurationMillis()); + Assert.assertEquals(2000, window1.getDurationMillis()); Assert.assertEquals("a", window1.getKey()); Assert.assertEquals("a", out.getValue().getKey()); Assert.assertEquals(2L, out.getValue().getValue().longValue()); @@ -461,7 +461,7 @@ public class WindowedOperatorTest Window.SessionWindow<String> window2 = (Window.SessionWindow<String>)out.getWindows().iterator().next(); Assert.assertEquals(1100L, window2.getBeginTimestamp()); - Assert.assertEquals(901, window2.getDurationMillis()); + Assert.assertEquals(2900, window2.getDurationMillis()); Assert.assertEquals("a", out.getValue().getKey()); Assert.assertEquals(5L, out.getValue().getValue().longValue()); sink.clear(); @@ -474,7 +474,7 @@ public class WindowedOperatorTest Assert.assertEquals(1, out.getWindows().size()); Window.SessionWindow<String> window3 = (Window.SessionWindow<String>)out.getWindows().iterator().next(); Assert.assertEquals(5000L, window3.getBeginTimestamp()); - Assert.assertEquals(1, window3.getDurationMillis()); + Assert.assertEquals(2000, window3.getDurationMillis()); Assert.assertEquals("a", out.getValue().getKey()); Assert.assertEquals(4L, out.getValue().getValue().longValue()); sink.clear(); @@ -510,7 +510,7 @@ public class WindowedOperatorTest Assert.assertEquals(1, out.getWindows().size()); Window.SessionWindow<String> window4 = (Window.SessionWindow<String>)out.getWindows().iterator().next(); Assert.assertEquals(1100L, window4.getBeginTimestamp()); - Assert.assertEquals(3901, window4.getDurationMillis()); + Assert.assertEquals(5900, window4.getDurationMillis()); Assert.assertEquals("a", out.getValue().getKey()); Assert.assertEquals(12L, out.getValue().getValue().longValue());
