Repository: apex-malhar Updated Branches: refs/heads/master 2f308aa21 -> 2cf8bade8
APEXMALHAR-2307 #resolve delete the windows properly after merge or extend Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/2cf8bade Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/2cf8bade Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/2cf8bade Branch: refs/heads/master Commit: 2cf8bade8f2b4505a915ff3cfca79424f5ec2df1 Parents: 5131bee Author: David Yan <da...@datatorrent.com> Authored: Tue Oct 18 18:18:58 2016 -0700 Committer: Siyuan Hua <hsy...@apache.org> Committed: Wed Oct 19 12:36:20 2016 -0700 ---------------------------------------------------------------------- .../impl/InMemorySessionWindowedStorage.java | 56 ++++++++++++++------ .../impl/SpillableSessionWindowedStorage.java | 27 +++++----- 2 files changed, 51 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2cf8bade/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java index 0247cbc..b696937 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java @@ -24,12 +24,18 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; +import java.util.SortedSet; import java.util.TreeSet; import org.apache.apex.malhar.lib.window.SessionWindowedStorage; import org.apache.apex.malhar.lib.window.Window; import org.apache.hadoop.classification.InterfaceStability; +import com.google.common.base.Supplier; +import com.google.common.collect.Multimaps; +import com.google.common.collect.SortedSetMultimap; + /** * This is the in-memory implementation of {@link WindowedKeyedStorage}. Do not use this class if you have a large state that * can't be fit in memory. @@ -40,7 +46,14 @@ import org.apache.hadoop.classification.InterfaceStability; public class InMemorySessionWindowedStorage<K, V> extends InMemoryWindowedKeyedStorage<K, V> implements SessionWindowedStorage<K, V> { - private Map<K, TreeSet<Window.SessionWindow<K>>> keyToWindows = new HashMap<>(); + private SortedSetMultimap<K, Window.SessionWindow<K>> keyToWindows = Multimaps.newSortedSetMultimap(new HashMap<K, Collection<Window.SessionWindow<K>>>(), new Supplier<SortedSet<Window.SessionWindow<K>>>() + { + @Override + public SortedSet<Window.SessionWindow<K>> get() + { + return new TreeSet<>(); + } + }); @Override public void put(Window window, K key, V value) @@ -48,37 +61,46 @@ public class InMemorySessionWindowedStorage<K, V> extends InMemoryWindowedKeyedS @SuppressWarnings("unchecked") Window.SessionWindow<K> sessionWindow = (Window.SessionWindow<K>)window; super.put(window, key, value); - TreeSet<Window.SessionWindow<K>> sessionWindows = keyToWindows.get(key); - if (sessionWindows == null) { - sessionWindows = new TreeSet<>(); - keyToWindows.put(key, sessionWindows); - } - sessionWindows.add(sessionWindow); + keyToWindows.put(key, sessionWindow); + } + + @Override + @SuppressWarnings("unchecked") + public void remove(Window window) + { + super.remove(window); + Window.SessionWindow<K> sessionWindow = (Window.SessionWindow<K>)window; + keyToWindows.remove(sessionWindow.getKey(), sessionWindow); } @Override public void migrateWindow(Window.SessionWindow<K> fromWindow, Window.SessionWindow<K> toWindow) { - if (containsWindow(fromWindow)) { - map.put(toWindow, map.remove(fromWindow)); + if (!containsWindow(fromWindow)) { + throw new NoSuchElementException(); } + map.put(toWindow, map.remove(fromWindow)); + keyToWindows.remove(fromWindow.getKey(), fromWindow); + keyToWindows.put(toWindow.getKey(), toWindow); } @Override public Collection<Map.Entry<Window.SessionWindow<K>, V>> getSessionEntries(K key, long timestamp, long gap) { List<Map.Entry<Window.SessionWindow<K>, V>> results = new ArrayList<>(); - TreeSet<Window.SessionWindow<K>> sessionWindows = keyToWindows.get(key); + SortedSet<Window.SessionWindow<K>> sessionWindows = keyToWindows.get(key); if (sessionWindows != null) { - Window.SessionWindow<K> refWindow = new Window.SessionWindow<>(key, timestamp, 1); - Window.SessionWindow<K> floor = sessionWindows.floor(refWindow); - if (floor != null) { - if (floor.getBeginTimestamp() + floor.getDurationMillis() > timestamp) { - results.add(new AbstractMap.SimpleEntry<>(floor, map.get(floor).get(key))); + Window.SessionWindow<K> refWindow = new Window.SessionWindow<>(key, timestamp, gap); + SortedSet<Window.SessionWindow<K>> headSet = sessionWindows.headSet(refWindow); + if (!headSet.isEmpty()) { + Window.SessionWindow<K> lower = headSet.last(); + if (lower.getBeginTimestamp() + lower.getDurationMillis() > timestamp) { + results.add(new AbstractMap.SimpleEntry<>(lower, map.get(lower).get(key))); } } - Window.SessionWindow<K> higher = sessionWindows.higher(refWindow); - if (higher != null) { + SortedSet<Window.SessionWindow<K>> tailSet = sessionWindows.tailSet(refWindow); + if (!tailSet.isEmpty()) { + Window.SessionWindow<K> higher = tailSet.first(); if (higher.getBeginTimestamp() - gap <= timestamp) { results.add(new AbstractMap.SimpleEntry<>(higher, map.get(higher).get(key))); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2cf8bade/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java index 5d39930..da44fb1 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; import java.util.Set; import org.apache.apex.malhar.lib.state.spillable.Spillable; @@ -79,23 +80,19 @@ public class SpillableSessionWindowedStorage<K, V> extends SpillableWindowedKeye @Override public void migrateWindow(Window.SessionWindow<K> fromWindow, Window.SessionWindow<K> toWindow) { - Set<K> keys = windowToKeysMap.get(fromWindow); - if (keys == null) { - return; - } - windowKeyToValueMap.remove(toWindow); - for (K key : keys) { - windowToKeysMap.put(toWindow, key); - ImmutablePair<Window, K> oldKey = new ImmutablePair<Window, K>(fromWindow, key); - ImmutablePair<Window, K> newKey = new ImmutablePair<Window, K>(toWindow, key); - - V value = windowKeyToValueMap.get(oldKey); - windowKeyToValueMap.remove(oldKey); - windowKeyToValueMap.put(newKey, value); - keyToWindowsMap.remove(key, fromWindow); - keyToWindowsMap.put(key, toWindow); + K key = fromWindow.getKey(); + ImmutablePair<Window, K> oldKey = new ImmutablePair<Window, K>(fromWindow, key); + ImmutablePair<Window, K> newKey = new ImmutablePair<Window, K>(toWindow, key); + V value = windowKeyToValueMap.get(oldKey); + if (value == null) { + throw new NoSuchElementException(); } + windowKeyToValueMap.remove(oldKey); + windowKeyToValueMap.put(newKey, value); + keyToWindowsMap.remove(key, fromWindow); + keyToWindowsMap.put(key, toWindow); windowToKeysMap.removeAll(fromWindow); + windowToKeysMap.put(toWindow, key); } @Override