Repository: apex-malhar
Updated Branches:
  refs/heads/master 2f308aa21 -> 2cf8bade8


APEXMALHAR-2307 #resolve delete the windows properly after merge or extend


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/2cf8bade
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/2cf8bade
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/2cf8bade

Branch: refs/heads/master
Commit: 2cf8bade8f2b4505a915ff3cfca79424f5ec2df1
Parents: 5131bee
Author: David Yan <da...@datatorrent.com>
Authored: Tue Oct 18 18:18:58 2016 -0700
Committer: Siyuan Hua <hsy...@apache.org>
Committed: Wed Oct 19 12:36:20 2016 -0700

----------------------------------------------------------------------
 .../impl/InMemorySessionWindowedStorage.java    | 56 ++++++++++++++------
 .../impl/SpillableSessionWindowedStorage.java   | 27 +++++-----
 2 files changed, 51 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2cf8bade/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java
index 0247cbc..b696937 100644
--- 
a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java
@@ -24,12 +24,18 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.SortedSet;
 import java.util.TreeSet;
 
 import org.apache.apex.malhar.lib.window.SessionWindowedStorage;
 import org.apache.apex.malhar.lib.window.Window;
 import org.apache.hadoop.classification.InterfaceStability;
 
+import com.google.common.base.Supplier;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.SortedSetMultimap;
+
 /**
  * This is the in-memory implementation of {@link WindowedKeyedStorage}. Do 
not use this class if you have a large state that
  * can't be fit in memory.
@@ -40,7 +46,14 @@ import org.apache.hadoop.classification.InterfaceStability;
 public class InMemorySessionWindowedStorage<K, V> extends 
InMemoryWindowedKeyedStorage<K, V>
     implements SessionWindowedStorage<K, V>
 {
-  private Map<K, TreeSet<Window.SessionWindow<K>>> keyToWindows = new 
HashMap<>();
+  private SortedSetMultimap<K, Window.SessionWindow<K>> keyToWindows = 
Multimaps.newSortedSetMultimap(new HashMap<K, 
Collection<Window.SessionWindow<K>>>(), new 
Supplier<SortedSet<Window.SessionWindow<K>>>()
+  {
+    @Override
+    public SortedSet<Window.SessionWindow<K>> get()
+    {
+      return new TreeSet<>();
+    }
+  });
 
   @Override
   public void put(Window window, K key, V value)
@@ -48,37 +61,46 @@ public class InMemorySessionWindowedStorage<K, V> extends 
InMemoryWindowedKeyedS
     @SuppressWarnings("unchecked")
     Window.SessionWindow<K> sessionWindow = (Window.SessionWindow<K>)window;
     super.put(window, key, value);
-    TreeSet<Window.SessionWindow<K>> sessionWindows = keyToWindows.get(key);
-    if (sessionWindows == null) {
-      sessionWindows = new TreeSet<>();
-      keyToWindows.put(key, sessionWindows);
-    }
-    sessionWindows.add(sessionWindow);
+    keyToWindows.put(key, sessionWindow);
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void remove(Window window)
+  {
+    super.remove(window);
+    Window.SessionWindow<K> sessionWindow = (Window.SessionWindow<K>)window;
+    keyToWindows.remove(sessionWindow.getKey(), sessionWindow);
   }
 
   @Override
   public void migrateWindow(Window.SessionWindow<K> fromWindow, 
Window.SessionWindow<K> toWindow)
   {
-    if (containsWindow(fromWindow)) {
-      map.put(toWindow, map.remove(fromWindow));
+    if (!containsWindow(fromWindow)) {
+      throw new NoSuchElementException();
     }
+    map.put(toWindow, map.remove(fromWindow));
+    keyToWindows.remove(fromWindow.getKey(), fromWindow);
+    keyToWindows.put(toWindow.getKey(), toWindow);
   }
 
   @Override
   public Collection<Map.Entry<Window.SessionWindow<K>, V>> getSessionEntries(K 
key, long timestamp, long gap)
   {
     List<Map.Entry<Window.SessionWindow<K>, V>> results = new ArrayList<>();
-    TreeSet<Window.SessionWindow<K>> sessionWindows = keyToWindows.get(key);
+    SortedSet<Window.SessionWindow<K>> sessionWindows = keyToWindows.get(key);
     if (sessionWindows != null) {
-      Window.SessionWindow<K> refWindow = new Window.SessionWindow<>(key, 
timestamp, 1);
-      Window.SessionWindow<K> floor = sessionWindows.floor(refWindow);
-      if (floor != null) {
-        if (floor.getBeginTimestamp() + floor.getDurationMillis() > timestamp) 
{
-          results.add(new AbstractMap.SimpleEntry<>(floor, 
map.get(floor).get(key)));
+      Window.SessionWindow<K> refWindow = new Window.SessionWindow<>(key, 
timestamp, gap);
+      SortedSet<Window.SessionWindow<K>> headSet = 
sessionWindows.headSet(refWindow);
+      if (!headSet.isEmpty()) {
+        Window.SessionWindow<K> lower = headSet.last();
+        if (lower.getBeginTimestamp() + lower.getDurationMillis() > timestamp) 
{
+          results.add(new AbstractMap.SimpleEntry<>(lower, 
map.get(lower).get(key)));
         }
       }
-      Window.SessionWindow<K> higher = sessionWindows.higher(refWindow);
-      if (higher != null) {
+      SortedSet<Window.SessionWindow<K>> tailSet = 
sessionWindows.tailSet(refWindow);
+      if (!tailSet.isEmpty()) {
+        Window.SessionWindow<K> higher = tailSet.first();
         if (higher.getBeginTimestamp() - gap <= timestamp) {
           results.add(new AbstractMap.SimpleEntry<>(higher, 
map.get(higher).get(key)));
         }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2cf8bade/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java
index 5d39930..da44fb1 100644
--- 
a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Set;
 
 import org.apache.apex.malhar.lib.state.spillable.Spillable;
@@ -79,23 +80,19 @@ public class SpillableSessionWindowedStorage<K, V> extends 
SpillableWindowedKeye
   @Override
   public void migrateWindow(Window.SessionWindow<K> fromWindow, 
Window.SessionWindow<K> toWindow)
   {
-    Set<K> keys = windowToKeysMap.get(fromWindow);
-    if (keys == null) {
-      return;
-    }
-    windowKeyToValueMap.remove(toWindow);
-    for (K key : keys) {
-      windowToKeysMap.put(toWindow, key);
-      ImmutablePair<Window, K> oldKey = new ImmutablePair<Window, 
K>(fromWindow, key);
-      ImmutablePair<Window, K> newKey = new ImmutablePair<Window, K>(toWindow, 
key);
-
-      V value = windowKeyToValueMap.get(oldKey);
-      windowKeyToValueMap.remove(oldKey);
-      windowKeyToValueMap.put(newKey, value);
-      keyToWindowsMap.remove(key, fromWindow);
-      keyToWindowsMap.put(key, toWindow);
+    K key = fromWindow.getKey();
+    ImmutablePair<Window, K> oldKey = new ImmutablePair<Window, K>(fromWindow, 
key);
+    ImmutablePair<Window, K> newKey = new ImmutablePair<Window, K>(toWindow, 
key);
+    V value = windowKeyToValueMap.get(oldKey);
+    if (value == null) {
+      throw new NoSuchElementException();
     }
+    windowKeyToValueMap.remove(oldKey);
+    windowKeyToValueMap.put(newKey, value);
+    keyToWindowsMap.remove(key, fromWindow);
+    keyToWindowsMap.put(key, toWindow);
     windowToKeysMap.removeAll(fromWindow);
+    windowToKeysMap.put(toWindow, key);
   }
 
   @Override

Reply via email to