APEXMALHAR-2305 #resolve Mirror the proto-session window behavior described in 
the streaming 102 blog


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/5131bee0
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/5131bee0
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/5131bee0

Branch: refs/heads/master
Commit: 5131bee0ed0c519294d87f2c460556d0a63f284e
Parents: 2f308aa
Author: David Yan <[email protected]>
Authored: Tue Oct 18 15:41:28 2016 -0700
Committer: Siyuan Hua <[email protected]>
Committed: Wed Oct 19 12:36:20 2016 -0700

----------------------------------------------------------------------
 .../apex/malhar/lib/window/SessionWindowedStorage.java   |  6 +++---
 .../lib/window/impl/InMemorySessionWindowedStorage.java  |  2 +-
 .../lib/window/impl/KeyedWindowedOperatorImpl.java       | 11 ++++++-----
 .../lib/window/impl/SpillableSessionWindowedStorage.java |  2 +-
 .../apex/malhar/lib/window/WindowedOperatorTest.java     |  8 ++++----
 5 files changed, 15 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5131bee0/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java
index 3e25d15..b2accbb 100644
--- 
a/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java
@@ -44,10 +44,10 @@ public interface SessionWindowedStorage<K, V> extends 
WindowedStorage.WindowedKe
   void migrateWindow(Window.SessionWindow<K> fromWindow, 
Window.SessionWindow<K> toWindow);
 
   /**
-   * Given the key, the timestamp and the gap, gets the data that falls into 
timestamp +/- gap.
-   * This is used for getting the entry the data given the timestamp belongs 
to, and for determining whether to merge
+   * Given the key, the timestamp and the gap, gets the windows that overlaps 
with timestamp to (timestamp + gap).
+   * This is used for getting the windows the timestamp belongs to, and for 
determining whether to merge
    * session windows.
-   * This should only return at most two entries if sessions have been merged 
appropriately.
+   * This should only return at most two windows if sessions have been merged 
appropriately.
    *
    * @param key the key
    * @param timestamp the timestamp

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5131bee0/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java
index 906b1b9..0247cbc 100644
--- 
a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java
@@ -73,7 +73,7 @@ public class InMemorySessionWindowedStorage<K, V> extends 
InMemoryWindowedKeyedS
       Window.SessionWindow<K> refWindow = new Window.SessionWindow<>(key, 
timestamp, 1);
       Window.SessionWindow<K> floor = sessionWindows.floor(refWindow);
       if (floor != null) {
-        if (floor.getBeginTimestamp() + floor.getDurationMillis() + gap > 
timestamp) {
+        if (floor.getBeginTimestamp() + floor.getDurationMillis() > timestamp) 
{
           results.add(new AbstractMap.SimpleEntry<>(floor, 
map.get(floor).get(key)));
         }
       }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5131bee0/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
index a33133b..b01fe61 100644
--- 
a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
@@ -60,12 +60,13 @@ public class KeyedWindowedOperatorImpl<KeyT, InputValT, 
AccumT, OutputValT>
       KeyT key = ((KeyValPair<KeyT, ?>)inputTuple.getValue()).getKey();
       WindowOption.SessionWindows sessionWindowOption = 
(WindowOption.SessionWindows)windowOption;
       SessionWindowedStorage<KeyT, AccumT> sessionStorage = 
(SessionWindowedStorage<KeyT, AccumT>)dataStorage;
-      Collection<Map.Entry<Window.SessionWindow<KeyT>, AccumT>> sessionEntries 
= sessionStorage.getSessionEntries(key, timestamp, 
sessionWindowOption.getMinGap().getMillis());
+      long minGapMillis = sessionWindowOption.getMinGap().getMillis();
+      Collection<Map.Entry<Window.SessionWindow<KeyT>, AccumT>> sessionEntries 
= sessionStorage.getSessionEntries(key, timestamp, minGapMillis);
       Window.SessionWindow<KeyT> sessionWindowToAssign;
       switch (sessionEntries.size()) {
         case 0: {
           // There are no existing windows within the minimum gap. Create a 
new session window
-          Window.SessionWindow<KeyT> sessionWindow = new 
Window.SessionWindow<>(key, timestamp, 1);
+          Window.SessionWindow<KeyT> sessionWindow = new 
Window.SessionWindow<>(key, timestamp, minGapMillis);
           windowStateMap.put(sessionWindow, new WindowState());
           sessionWindowToAssign = sessionWindow;
           break;
@@ -74,7 +75,7 @@ public class KeyedWindowedOperatorImpl<KeyT, InputValT, 
AccumT, OutputValT>
           // There is already one existing window within the minimum gap. See 
whether we need to extend the time of that window
           Map.Entry<Window.SessionWindow<KeyT>, AccumT> sessionWindowEntry = 
sessionEntries.iterator().next();
           Window.SessionWindow<KeyT> sessionWindow = 
sessionWindowEntry.getKey();
-          if (sessionWindow.getBeginTimestamp() <= timestamp && timestamp < 
sessionWindow.getBeginTimestamp() + sessionWindow.getDurationMillis()) {
+          if (sessionWindow.getBeginTimestamp() <= timestamp && timestamp + 
minGapMillis <= sessionWindow.getBeginTimestamp() + 
sessionWindow.getDurationMillis()) {
             // The session window already covers the event
             sessionWindowToAssign = sessionWindow;
           } else {
@@ -86,7 +87,7 @@ public class KeyedWindowedOperatorImpl<KeyT, InputValT, 
AccumT, OutputValT>
             }
             // create a new session window that covers the timestamp
             long newBeginTimestamp = 
Math.min(sessionWindow.getBeginTimestamp(), timestamp);
-            long newEndTimestamp = Math.max(sessionWindow.getBeginTimestamp() 
+ sessionWindow.getDurationMillis(), timestamp + 1);
+            long newEndTimestamp = Math.max(sessionWindow.getBeginTimestamp() 
+ sessionWindow.getDurationMillis(), timestamp + minGapMillis);
             Window.SessionWindow<KeyT> newSessionWindow =
                 new Window.SessionWindow<>(key, newBeginTimestamp, 
newEndTimestamp - newBeginTimestamp);
             windowStateMap.remove(sessionWindow);
@@ -97,7 +98,7 @@ public class KeyedWindowedOperatorImpl<KeyT, InputValT, 
AccumT, OutputValT>
           break;
         }
         case 2: {
-          // There are two windows that fall within the minimum gap of the 
timestamp. We need to merge the two windows
+          // There are two windows that overlap the proto-session window of 
the timestamp. We need to merge the two windows
           Iterator<Map.Entry<Window.SessionWindow<KeyT>, AccumT>> iterator = 
sessionEntries.iterator();
           Map.Entry<Window.SessionWindow<KeyT>, AccumT> sessionWindowEntry1 = 
iterator.next();
           Map.Entry<Window.SessionWindow<KeyT>, AccumT> sessionWindowEntry2 = 
iterator.next();

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5131bee0/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java
index 8779739..5d39930 100644
--- 
a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java
@@ -106,7 +106,7 @@ public class SpillableSessionWindowedStorage<K, V> extends 
SpillableWindowedKeye
     if (sessionWindows != null) {
       for (Window.SessionWindow<K> window : sessionWindows) {
         if (timestamp > window.getBeginTimestamp()) {
-          if (window.getBeginTimestamp() + window.getDurationMillis() + gap > 
timestamp) {
+          if (window.getBeginTimestamp() + window.getDurationMillis() > 
timestamp) {
             results.add(new AbstractMap.SimpleEntry<>(window, 
windowKeyToValueMap.get(new ImmutablePair<Window, K>(window, key))));
           }
         } else if (timestamp < window.getBeginTimestamp()) {

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5131bee0/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
index 15aba82..4a1cef0 100644
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
@@ -438,7 +438,7 @@ public class WindowedOperatorTest
     Assert.assertEquals(1, out.getWindows().size());
     Window.SessionWindow<String> window1 = 
(Window.SessionWindow<String>)out.getWindows().iterator().next();
     Assert.assertEquals(1100L, window1.getBeginTimestamp());
-    Assert.assertEquals(1, window1.getDurationMillis());
+    Assert.assertEquals(2000, window1.getDurationMillis());
     Assert.assertEquals("a", window1.getKey());
     Assert.assertEquals("a", out.getValue().getKey());
     Assert.assertEquals(2L, out.getValue().getValue().longValue());
@@ -461,7 +461,7 @@ public class WindowedOperatorTest
     Window.SessionWindow<String> window2 = 
(Window.SessionWindow<String>)out.getWindows().iterator().next();
 
     Assert.assertEquals(1100L, window2.getBeginTimestamp());
-    Assert.assertEquals(901, window2.getDurationMillis());
+    Assert.assertEquals(2900, window2.getDurationMillis());
     Assert.assertEquals("a", out.getValue().getKey());
     Assert.assertEquals(5L, out.getValue().getValue().longValue());
     sink.clear();
@@ -474,7 +474,7 @@ public class WindowedOperatorTest
     Assert.assertEquals(1, out.getWindows().size());
     Window.SessionWindow<String> window3 = 
(Window.SessionWindow<String>)out.getWindows().iterator().next();
     Assert.assertEquals(5000L, window3.getBeginTimestamp());
-    Assert.assertEquals(1, window3.getDurationMillis());
+    Assert.assertEquals(2000, window3.getDurationMillis());
     Assert.assertEquals("a", out.getValue().getKey());
     Assert.assertEquals(4L, out.getValue().getValue().longValue());
     sink.clear();
@@ -510,7 +510,7 @@ public class WindowedOperatorTest
     Assert.assertEquals(1, out.getWindows().size());
     Window.SessionWindow<String> window4 = 
(Window.SessionWindow<String>)out.getWindows().iterator().next();
     Assert.assertEquals(1100L, window4.getBeginTimestamp());
-    Assert.assertEquals(3901, window4.getDurationMillis());
+    Assert.assertEquals(5900, window4.getDurationMillis());
     Assert.assertEquals("a", out.getValue().getKey());
     Assert.assertEquals(12L, out.getValue().getValue().longValue());
 

Reply via email to