Repository: apex-malhar Updated Branches: refs/heads/master 763d14fca -> c19c80d88
APEXMALHAR-2130 Spillable implementation for WindowedOperator Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/c19c80d8 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/c19c80d8 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/c19c80d8 Branch: refs/heads/master Commit: c19c80d8882fd530926093d8ce6b81c0503febc3 Parents: 763d14f Author: David Yan <[email protected]> Authored: Mon Aug 15 14:19:08 2016 -0700 Committer: Siyuan Hua <[email protected]> Committed: Tue Sep 27 09:47:49 2016 -0700 ---------------------------------------------------------------------- .../spillable/SpillableSetMultimapImpl.java | 8 +- .../apex/malhar/lib/window/Accumulation.java | 22 +- .../apex/malhar/lib/window/ControlTuple.java | 2 +- .../lib/window/SessionWindowedStorage.java | 8 +- .../apex/malhar/lib/window/TriggerOption.java | 32 +-- .../apache/apex/malhar/lib/window/Tuple.java | 5 +- .../apache/apex/malhar/lib/window/Window.java | 5 +- .../apex/malhar/lib/window/WindowOption.java | 2 +- .../malhar/lib/window/WindowedOperator.java | 27 +- .../apex/malhar/lib/window/WindowedStorage.java | 32 +-- .../window/impl/AbstractWindowedOperator.java | 27 +- .../impl/InMemorySessionWindowedStorage.java | 4 +- .../impl/SpillableSessionWindowedStorage.java | 123 +++++++++ .../impl/SpillableWindowedKeyedStorage.java | 224 ++++++++++++++++ .../impl/SpillableWindowedPlainStorage.java | 146 +++++++++++ .../lib/helper/OperatorContextTestHelper.java | 5 + .../window/SpillableWindowedStorageTest.java | 150 +++++++++++ .../malhar/lib/window/WindowedOperatorTest.java | 261 ++++++++++++------- 18 files changed, 904 insertions(+), 179 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java index 951ef76..c227ed7 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java @@ -167,7 +167,7 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul if (spillableSet != null) { cache.remove((K)key); Slice keySlice = SliceUtils.concatenate(serdeKey.serialize((K)key), META_KEY_SUFFIX); - map.remove(keySlice); + map.put(keySlice, new ImmutablePair<>(0, spillableSet.getHead())); spillableSet.clear(); removedSets.add(spillableSet); } @@ -200,7 +200,8 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul return true; } Slice keySlice = SliceUtils.concatenate(serdeKey.serialize((K)key), META_KEY_SUFFIX); - return map.containsKey(keySlice); + Pair<Integer, V> meta = map.get(keySlice); + return meta != null && meta.getLeft() > 0; } @Override @@ -230,8 +231,7 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul spillableSet = new SpillableSetImpl<>(bucket, keyPrefix.toByteArray(), store, serdeValue); cache.put(key, spillableSet); } - spillableSet.add(value); - return true; + return spillableSet.add(value); } @Override http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/main/java/org/apache/apex/malhar/lib/window/Accumulation.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/Accumulation.java b/library/src/main/java/org/apache/apex/malhar/lib/window/Accumulation.java index 03f7ff7..44814d0 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/Accumulation.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/Accumulation.java @@ -37,33 +37,33 @@ public interface Accumulation<InputT, AccumT, OutputT> /** * Returns the default accumulated value when nothing has been accumulated * - * @return + * @return the default accumulated value */ AccumT defaultAccumulatedValue(); /** * Accumulates the input to the accumulated value * - * @param accumulatedValue - * @param input - * @return + * @param accumulatedValue the accumulated value + * @param input the input value + * @return the result accumulated value */ AccumT accumulate(AccumT accumulatedValue, InputT input); /** * Merges two accumulated values into one * - * @param accumulatedValue1 - * @param accumulatedValue2 - * @return + * @param accumulatedValue1 the first accumulated value + * @param accumulatedValue2 the second accumulated value + * @return the result accumulated value */ AccumT merge(AccumT accumulatedValue1, AccumT accumulatedValue2); /** * Gets the output of the accumulated value. This is used for generating the data for triggers * - * @param accumulatedValue - * @return + * @param accumulatedValue the accumulated value + * @return the output */ OutputT getOutput(AccumT accumulatedValue); @@ -71,8 +71,8 @@ public interface Accumulation<InputT, AccumT, OutputT> * Gets the retraction of the value. This is used for retracting previous panes in * ACCUMULATING_AND_RETRACTING accumulation mode * - * @param value - * @return + * @param value the value to be retracted + * @return the retracted value */ OutputT getRetraction(OutputT value); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/main/java/org/apache/apex/malhar/lib/window/ControlTuple.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/ControlTuple.java b/library/src/main/java/org/apache/apex/malhar/lib/window/ControlTuple.java index 3288398..69093e5 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/ControlTuple.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/ControlTuple.java @@ -37,7 +37,7 @@ public interface ControlTuple /** * Gets the timestamp associated with this watermark * - * @return + * @return the timestamp */ long getTimestamp(); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java index 4cb2b1a..3e25d15 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java @@ -38,8 +38,8 @@ public interface SessionWindowedStorage<K, V> extends WindowedStorage.WindowedKe * Migrate the data from one window to another. This will invalidate fromWindow in the storage and move the * data to toWindow, and overwrite any existing data in toWindow * - * @param fromWindow - * @param toWindow + * @param fromWindow the window we want to migrate from + * @param toWindow the window we want to migrate to */ void migrateWindow(Window.SessionWindow<K> fromWindow, Window.SessionWindow<K> toWindow); @@ -51,8 +51,8 @@ public interface SessionWindowedStorage<K, V> extends WindowedStorage.WindowedKe * * @param key the key * @param timestamp the timestamp - * @param gap - * @return + * @param gap the minimum gap + * @return the windows */ Collection<Map.Entry<Window.SessionWindow<K>, V>> getSessionEntries(K key, long timestamp, long gap); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/main/java/org/apache/apex/malhar/lib/window/TriggerOption.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/TriggerOption.java b/library/src/main/java/org/apache/apex/malhar/lib/window/TriggerOption.java index 266577f..bd9cd9c 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/TriggerOption.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/TriggerOption.java @@ -138,7 +138,7 @@ public class TriggerOption /** * Creates a TriggerOption with an initial trigger that should be fired at the watermark * - * @return + * @return the TriggerOption */ public static TriggerOption AtWatermark() { @@ -151,8 +151,8 @@ public class TriggerOption /** * A trigger should be fired before the watermark once for every specified duration * - * @param duration - * @return + * @param duration the duration + * @return the TriggerOption */ public TriggerOption withEarlyFiringsAtEvery(Duration duration) { @@ -164,8 +164,8 @@ public class TriggerOption /** * A trigger should be fired before the watermark once for every n tuple(s) * - * @param count - * @return + * @param count the count + * @return the TriggerOption */ public TriggerOption withEarlyFiringsAtEvery(long count) { @@ -177,8 +177,8 @@ public class TriggerOption /** * A trigger should be fired after the watermark once for every specified duration * - * @param duration - * @return + * @param duration the duration + * @return the TriggerOption */ public TriggerOption withLateFiringsAtEvery(Duration duration) { @@ -190,8 +190,8 @@ public class TriggerOption /** * A trigger should be fired after the watermark once for every n late tuple(s) * - * @param count - * @return + * @param count the count + * @return the TriggerOption */ public TriggerOption withLateFiringsAtEvery(long count) { @@ -203,7 +203,7 @@ public class TriggerOption /** * With discarding mode, the state is discarded after each trigger * - * @return + * @return the TriggerOption */ public TriggerOption discardingFiredPanes() { @@ -214,7 +214,7 @@ public class TriggerOption /** * With accumulating mode, the state is kept * - * @return + * @return the TriggerOption */ public TriggerOption accumulatingFiredPanes() { @@ -227,7 +227,7 @@ public class TriggerOption * so when new values come in that change the state, a retraction trigger can be fired with the snapshot of the state * when the last trigger was fired * - * @return + * @return the TriggerOption */ public TriggerOption accumulatingAndRetractingFiredPanes() { @@ -239,7 +239,7 @@ public class TriggerOption * Only fire triggers for data that has changed from the last trigger. This only applies to ACCUMULATING and * ACCUMULATING_AND_RETRACTING accumulation modes. * - * @return + * @return the TriggerOption */ public TriggerOption firingOnlyUpdatedPanes() { @@ -250,7 +250,7 @@ public class TriggerOption /** * Gets the accumulation mode * - * @return + * @return the AccumulationMode */ public AccumulationMode getAccumulationMode() { @@ -260,7 +260,7 @@ public class TriggerOption /** * Gets the trigger list * - * @return + * @return the trigger list */ public List<Trigger> getTriggerList() { @@ -271,7 +271,7 @@ public class TriggerOption * Returns whether we should only fire panes that have been updated since the last trigger. * When this option is set, DISCARDING accumulation mode must not be used. * - * @return + * @return whether we want to fire only updated panes */ public boolean isFiringOnlyUpdatedPanes() { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/main/java/org/apache/apex/malhar/lib/window/Tuple.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/Tuple.java b/library/src/main/java/org/apache/apex/malhar/lib/window/Tuple.java index aea6bf6..c7eba4e 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/Tuple.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/Tuple.java @@ -35,7 +35,7 @@ public interface Tuple<T> /** * Gets the value of the tuple * - * @return + * @return the value */ T getValue(); @@ -81,6 +81,7 @@ public interface Tuple<T> } @Override + @SuppressWarnings("unchecked") public boolean equals(Object obj) { if (obj instanceof PlainTuple) { @@ -133,6 +134,7 @@ public interface Tuple<T> } @Override + @SuppressWarnings("unchecked") public boolean equals(Object obj) { if (obj instanceof TimestampedTuple && super.equals(obj)) { @@ -186,6 +188,7 @@ public interface Tuple<T> } @Override + @SuppressWarnings("unchecked") public boolean equals(Object obj) { if (obj instanceof WindowedTuple && super.equals(obj)) { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java b/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java index 50d6445..1d7681d 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java @@ -105,7 +105,7 @@ public interface Window<WINDOW extends Comparable<WINDOW>> extends Comparable<WI /** * Gets the beginning timestamp of this window * - * @return + * @return the begin timestamp */ @Override public long getBeginTimestamp() @@ -116,7 +116,7 @@ public interface Window<WINDOW extends Comparable<WINDOW>> extends Comparable<WI /** * Gets the duration millis of this window * - * @return + * @return the duration */ @Override public long getDurationMillis() @@ -198,6 +198,7 @@ public interface Window<WINDOW extends Comparable<WINDOW>> extends Comparable<WI return false; } if (other instanceof SessionWindow) { + @SuppressWarnings("unchecked") SessionWindow<K> otherSessionWindow = (SessionWindow<K>)other; if (key == null) { return otherSessionWindow.key == null; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/main/java/org/apache/apex/malhar/lib/window/WindowOption.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowOption.java b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowOption.java index 099709d..a88250e 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowOption.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowOption.java @@ -73,7 +73,7 @@ public interface WindowOption /** * The time window should be a sliding window with the given slide duration * - * @param duration + * @param duration the slide by duration * @return the SlidingTimeWindows */ public SlidingTimeWindows slideBy(Duration duration) http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedOperator.java index ccc7ae1..400a97b 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedOperator.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedOperator.java @@ -42,36 +42,36 @@ public interface WindowedOperator<InputT> /** * Sets the WindowOption of this operator * - * @param windowOption + * @param windowOption the window option */ void setWindowOption(WindowOption windowOption); /** * Sets the TriggerOption of this operator * - * @param triggerOption + * @param triggerOption the trigger option */ void setTriggerOption(TriggerOption triggerOption); /** * Sets the allowed lateness of this operator * - * @param allowedLateness + * @param allowedLateness the allowed lateness */ void setAllowedLateness(Duration allowedLateness); /** * This sets the function that extracts the timestamp from the input tuple * - * @param timestampExtractor + * @param timestampExtractor the timestamp extractor */ void setTimestampExtractor(Function<InputT, Long> timestampExtractor); /** * Assign window(s) for this input tuple * - * @param input - * @return + * @param input the input tuple + * @return the windowed tuple */ Tuple.WindowedTuple<InputT> getWindowedValue(Tuple<InputT> input); @@ -80,8 +80,8 @@ public interface WindowedOperator<InputT> * The implementation of this operator should look at the allowed lateness in the WindowOption. * It should also call this function and if it returns true, it should drop the associated tuple. * - * @param timestamp - * @return + * @param timestamp the timestamp + * @return whether the timestamp is considered too late */ boolean isTooLate(long timestamp); @@ -89,14 +89,14 @@ public interface WindowedOperator<InputT> * This method is supposed to drop the tuple because it has passed the allowed lateness. But an implementation * of this method has the chance to do something different (e.g. emit it to another port) * - * @param input + * @param input the input tuple */ void dropTuple(Tuple<InputT> input); /** * This method accumulates the incoming tuple (with the Accumulation interface) * - * @param tuple + * @param tuple the input tuple */ void accumulateTuple(Tuple.WindowedTuple<InputT> tuple); @@ -106,7 +106,7 @@ public interface WindowedOperator<InputT> * and change the state of each of those windows. All tuples for those windows arriving after * the watermark will be considered late. * - * @param watermark + * @param watermark the watermark tuple */ void processWatermark(ControlTuple.Watermark watermark); @@ -114,14 +114,15 @@ public interface WindowedOperator<InputT> * This method fires the trigger for the given window, and possibly retraction trigger. The implementation should clear * the window data in the storage if the accumulation mode is DISCARDING * - * @param window + * @param window the window + * @param windowState the window state */ void fireTrigger(Window window, WindowState windowState); /** * This method clears the window data in the storage. * - * @param window + * @param window the window */ void clearWindowData(Window window); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java index 42ecdae..e2874ba 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java @@ -37,21 +37,21 @@ public interface WindowedStorage extends Component<Context.OperatorContext> /** * Returns true if the storage contains this window * - * @param window + * @param window the window */ boolean containsWindow(Window window); /** * Returns the number of windows in the storage * - * @return + * @return the number of windows */ long size(); /** * Removes all the data associated with the given window. This does NOT mean removing the window in checkpointed state * - * @param window + * @param window the window */ void remove(Window window); @@ -68,23 +68,23 @@ public interface WindowedStorage extends Component<Context.OperatorContext> /** * Sets the data associated with the given window * - * @param window - * @param value + * @param window the window + * @param value the value */ void put(Window window, T value); /** * Gets the value associated with the given window * - * @param window - * @return + * @param window the window + * @return the value */ T get(Window window); /** * Returns the iterable of the entries in the storage * - * @return + * @return the entries */ Iterable<Map.Entry<Window, T>> entries(); } @@ -100,26 +100,26 @@ public interface WindowedStorage extends Component<Context.OperatorContext> /** * Sets the data associated with the given window and the key * - * @param window - * @param key - * @param value + * @param window the window + * @param key the key + * @param value the value */ void put(Window window, K key, V value); /** * Gets an iterable object over the key/value pairs associated with the given window * - * @param window - * @return + * @param window the window + * @return the entries */ Iterable<Map.Entry<K, V>> entries(Window window); /** * Gets the data associated with the given window and the key * - * @param window - * @param key - * @return + * @param window the window + * @param key the key + * @return the value */ V get(Window window, K key); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java index f90d47d..c778523 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java @@ -131,7 +131,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext /** * Process the incoming data tuple * - * @param tuple + * @param tuple the incoming tuple */ public void processTuple(Tuple<InputT> tuple) { @@ -206,21 +206,21 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext /** * This method sets the storage for the data for each window * - * @param storageAgent + * @param dataStorage The data storage */ - public void setDataStorage(DataStorageT storageAgent) + public void setDataStorage(DataStorageT dataStorage) { - this.dataStorage = storageAgent; + this.dataStorage = dataStorage; } /** * This method sets the storage for the retraction data for each window. Only used when the accumulation mode is ACCUMULATING_AND_RETRACTING * - * @param storageAgent + * @param retractionStorage The retraction storage */ - public void setRetractionStorage(RetractionStorageT storageAgent) + public void setRetractionStorage(RetractionStorageT retractionStorage) { - this.retractionStorage = storageAgent; + this.retractionStorage = retractionStorage; } public void addComponent(String key, Component<Context.OperatorContext> component) @@ -232,7 +232,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext * Sets the accumulation, which basically tells the WindowedOperator what to do if a new tuple comes in and what * to put in the pane when a trigger is fired * - * @param accumulation + * @param accumulation the accumulation */ public void setAccumulation(AccumulationT accumulation) { @@ -345,8 +345,8 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext * If we are doing sliding windows, this will return multiple windows. Otherwise, only one window will be returned. * Note that this method does not apply to SessionWindows. * - * @param timestamp - * @return + * @param timestamp the timestamp + * @return the windows this timestamp belongs to */ private Collection<Window.TimeWindow> getTimeWindowsForTimestamp(long timestamp) { @@ -378,7 +378,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext @Override public boolean isTooLate(long timestamp) { - return allowedLatenessMillis < 0 ? false : (timestamp < currentWatermark - allowedLatenessMillis); + return allowedLatenessMillis >= 0 && (timestamp < currentWatermark - allowedLatenessMillis); } @Override @@ -395,6 +395,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext } @Override + @SuppressWarnings("unchecked") public void setup(Context.OperatorContext context) { this.timeIncrement = context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) * context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS); @@ -537,7 +538,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext /** * This method fires the normal trigger for the given window. * - * @param window + * @param window the window to fire trigger on * @param fireOnlyUpdatedPanes Do not fire trigger if the old value is the same as the new value. If true, retraction storage is required. */ public abstract void fireNormalTrigger(Window window, boolean fireOnlyUpdatedPanes); @@ -546,7 +547,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext * This method fires the retraction trigger for the given window. This should only be valid if the accumulation * mode is ACCUMULATING_AND_RETRACTING * - * @param window + * @param window the window to fire the retraction trigger on */ public abstract void fireRetractionTrigger(Window window); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java index fdceb4d..906b1b9 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java @@ -45,13 +45,15 @@ public class InMemorySessionWindowedStorage<K, V> extends InMemoryWindowedKeyedS @Override public void put(Window window, K key, V value) { + @SuppressWarnings("unchecked") + Window.SessionWindow<K> sessionWindow = (Window.SessionWindow<K>)window; super.put(window, key, value); TreeSet<Window.SessionWindow<K>> sessionWindows = keyToWindows.get(key); if (sessionWindows == null) { sessionWindows = new TreeSet<>(); keyToWindows.put(key, sessionWindows); } - sessionWindows.add((Window.SessionWindow<K>)window); + sessionWindows.add(sessionWindow); } @Override http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java new file mode 100644 index 0000000..8779739 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl; + +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.apex.malhar.lib.window.SessionWindowedStorage; +import org.apache.apex.malhar.lib.window.Window; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.Slice; + +/** + * Spillable session windowed storage. + */ [email protected] +public class SpillableSessionWindowedStorage<K, V> extends SpillableWindowedKeyedStorage<K, V> implements SessionWindowedStorage<K, V> +{ + // additional key to windows map for fast lookup of windows using key + private Spillable.SpillableSetMultimap<K, Window.SessionWindow<K>> keyToWindowsMap; + + @Override + @SuppressWarnings("unchecked") + public void setup(Context.OperatorContext context) + { + super.setup(context); + if (keyToWindowsMap == null) { + // NOTE: this will pose difficulties when we try to assign the entries to a time bucket later on. + // This is logged in APEXMALHAR-2271 + keyToWindowsMap = scc.newSpillableSetMultimap(bucket, keySerde, (Serde<Window.SessionWindow<K>, Slice>)(Serde)windowSerde); + } + } + + @Override + @SuppressWarnings("unchecked") + public void remove(Window window) + { + super.remove(window); + Window.SessionWindow<K> sessionWindow = (Window.SessionWindow<K>)window; + keyToWindowsMap.remove(sessionWindow.getKey(), sessionWindow); + } + + @Override + public void put(Window window, K key, V value) + { + super.put(window, key, value); + @SuppressWarnings("unchecked") + Window.SessionWindow<K> sessionWindow = (Window.SessionWindow<K>)window; + if (!keyToWindowsMap.containsEntry(key, sessionWindow)) { + keyToWindowsMap.put(key, sessionWindow); + } + } + + @Override + public void migrateWindow(Window.SessionWindow<K> fromWindow, Window.SessionWindow<K> toWindow) + { + Set<K> keys = windowToKeysMap.get(fromWindow); + if (keys == null) { + return; + } + windowKeyToValueMap.remove(toWindow); + for (K key : keys) { + windowToKeysMap.put(toWindow, key); + ImmutablePair<Window, K> oldKey = new ImmutablePair<Window, K>(fromWindow, key); + ImmutablePair<Window, K> newKey = new ImmutablePair<Window, K>(toWindow, key); + + V value = windowKeyToValueMap.get(oldKey); + windowKeyToValueMap.remove(oldKey); + windowKeyToValueMap.put(newKey, value); + keyToWindowsMap.remove(key, fromWindow); + keyToWindowsMap.put(key, toWindow); + } + windowToKeysMap.removeAll(fromWindow); + } + + @Override + public Collection<Map.Entry<Window.SessionWindow<K>, V>> getSessionEntries(K key, long timestamp, long gap) + { + List<Map.Entry<Window.SessionWindow<K>, V>> results = new ArrayList<>(); + Set<Window.SessionWindow<K>> sessionWindows = keyToWindowsMap.get(key); + if (sessionWindows != null) { + for (Window.SessionWindow<K> window : sessionWindows) { + if (timestamp > window.getBeginTimestamp()) { + if (window.getBeginTimestamp() + window.getDurationMillis() + gap > timestamp) { + results.add(new AbstractMap.SimpleEntry<>(window, windowKeyToValueMap.get(new ImmutablePair<Window, K>(window, key)))); + } + } else if (timestamp < window.getBeginTimestamp()) { + if (window.getBeginTimestamp() - gap <= timestamp) { + results.add(new AbstractMap.SimpleEntry<>(window, windowKeyToValueMap.get(new ImmutablePair<Window, K>(window, key)))); + } + } else { + results.add(new AbstractMap.SimpleEntry<>(window, windowKeyToValueMap.get(new ImmutablePair<Window, K>(window, key)))); + } + } + } + return results; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java new file mode 100644 index 0000000..ac77d1b --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl; + +import java.util.AbstractMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.apex.malhar.lib.utils.serde.SerdeKryoSlice; +import org.apache.apex.malhar.lib.window.Window; +import org.apache.apex.malhar.lib.window.WindowedStorage; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.Slice; + +/** + * Implementation of WindowedKeyedStorage using {@link Spillable} data structures + * + * @param <K> The key type + * @param <V> The value type + */ +public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.WindowedKeyedStorage<K, V> +{ + @NotNull + protected SpillableComplexComponent scc; + protected long bucket; + protected Serde<Window, Slice> windowSerde; + protected Serde<Pair<Window, K>, Slice> windowKeyPairSerde; + protected Serde<K, Slice> keySerde; + protected Serde<V, Slice> valueSerde; + + protected Spillable.SpillableByteMap<Pair<Window, K>, V> windowKeyToValueMap; + protected Spillable.SpillableSetMultimap<Window, K> windowToKeysMap; + + private class KVIterator implements Iterator<Map.Entry<K, V>> + { + final Window window; + final Set<K> keys; + Iterator<K> iterator; + + KVIterator(Window window) + { + this.window = window; + this.keys = windowToKeysMap.get(window); + if (this.keys != null) { + this.iterator = this.keys.iterator(); + } + } + + @Override + public boolean hasNext() + { + return iterator != null && iterator.hasNext(); + } + + @Override + public Map.Entry<K, V> next() + { + K key = iterator.next(); + return new AbstractMap.SimpleEntry<>(key, windowKeyToValueMap.get(new ImmutablePair<>(window, key))); + } + + @Override + public void remove() + { + throw new UnsupportedOperationException(); + } + } + + public SpillableWindowedKeyedStorage() + { + } + + public SpillableWindowedKeyedStorage(long bucket, + Serde<Window, Slice> windowSerde, Serde<Pair<Window, K>, Slice> windowKeyPairSerde, Serde<K, Slice> keySerde, Serde<V, Slice> valueSerde) + { + this.bucket = bucket; + this.windowSerde = windowSerde; + this.windowKeyPairSerde = windowKeyPairSerde; + this.keySerde = keySerde; + this.valueSerde = valueSerde; + } + + public void setSpillableComplexComponent(SpillableComplexComponent scc) + { + this.scc = scc; + } + + public SpillableComplexComponent getSpillableComplexComponent() + { + return this.scc; + } + + public void setBucket(long bucket) + { + this.bucket = bucket; + } + + public void setWindowSerde(Serde<Window, Slice> windowSerde) + { + this.windowSerde = windowSerde; + } + + public void setWindowKeyPairSerde(Serde<Pair<Window, K>, Slice> windowKeyPairSerde) + { + this.windowKeyPairSerde = windowKeyPairSerde; + } + + public void setValueSerde(Serde<V, Slice> valueSerde) + { + this.valueSerde = valueSerde; + } + + @Override + public boolean containsWindow(Window window) + { + return windowToKeysMap.containsKey(window); + } + + @Override + public long size() + { + return windowToKeysMap.size(); + } + + @Override + public void remove(Window window) + { + Set<K> keys = windowToKeysMap.get(window); + if (keys != null) { + for (K key : keys) { + windowKeyToValueMap.remove(new ImmutablePair<>(window, key)); + } + } + windowToKeysMap.removeAll(window); + } + + @Override + public void setup(Context.OperatorContext context) + { + if (bucket == 0) { + // choose a bucket that is guaranteed to be unique in Apex + bucket = (context.getValue(Context.DAGContext.APPLICATION_NAME) + "#" + context.getId()).hashCode(); + } + // set default serdes + if (windowSerde == null) { + windowSerde = new SerdeKryoSlice<>(); + } + if (windowKeyPairSerde == null) { + windowKeyPairSerde = new SerdeKryoSlice<>(); + } + if (keySerde == null) { + keySerde = new SerdeKryoSlice<>(); + } + if (valueSerde == null) { + valueSerde = new SerdeKryoSlice<>(); + } + + if (windowKeyToValueMap == null) { + windowKeyToValueMap = scc.newSpillableByteMap(bucket, windowKeyPairSerde, valueSerde); + } + if (windowToKeysMap == null) { + windowToKeysMap = scc.newSpillableSetMultimap(bucket, windowSerde, keySerde); + } + } + + @Override + public void teardown() + { + } + + @Override + public void put(Window window, K key, V value) + { + if (!windowToKeysMap.containsEntry(window, key)) { + windowToKeysMap.put(window, key); + } + windowKeyToValueMap.put(new ImmutablePair<>(window, key), value); + } + + @Override + public Iterable<Map.Entry<K, V>> entries(final Window window) + { + return new Iterable<Map.Entry<K, V>>() + { + @Override + public Iterator<Map.Entry<K, V>> iterator() + { + return new KVIterator(window); + } + }; + } + + @Override + public V get(Window window, K key) + { + return windowKeyToValueMap.get(new ImmutablePair<>(window, key)); + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java new file mode 100644 index 0000000..81f5dbb --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl; + +import java.util.Map; + +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.apex.malhar.lib.utils.serde.SerdeKryoSlice; +import org.apache.apex.malhar.lib.window.Window; +import org.apache.apex.malhar.lib.window.WindowedStorage; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.Slice; + +/** + * This is an implementation of WindowedPlainStorage that makes use of {@link Spillable} data structures + * + * @param <T> Type of the value per window + */ +public class SpillableWindowedPlainStorage<T> implements WindowedStorage.WindowedPlainStorage<T> +{ + @NotNull + private SpillableComplexComponent scc; + private long bucket; + private Serde<Window, Slice> windowSerde; + private Serde<T, Slice> valueSerde; + + protected Spillable.SpillableByteMap<Window, T> windowToDataMap; + + public SpillableWindowedPlainStorage() + { + } + + public SpillableWindowedPlainStorage(long bucket, Serde<Window, Slice> windowSerde, Serde<T, Slice> valueSerde) + { + this.bucket = bucket; + this.windowSerde = windowSerde; + this.valueSerde = valueSerde; + } + + public void setSpillableComplexComponent(SpillableComplexComponent scc) + { + this.scc = scc; + } + + public SpillableComplexComponent getSpillableComplexComponent() + { + return scc; + } + + public void setBucket(long bucket) + { + this.bucket = bucket; + } + + public void setWindowSerde(Serde<Window, Slice> windowSerde) + { + this.windowSerde = windowSerde; + } + + public void setValueSerde(Serde<T, Slice> valueSerde) + { + this.valueSerde = valueSerde; + } + + @Override + public void put(Window window, T value) + { + windowToDataMap.put(window, value); + } + + @Override + public T get(Window window) + { + return windowToDataMap.get(window); + } + + @Override + public Iterable<Map.Entry<Window, T>> entries() + { + return windowToDataMap.entrySet(); + } + + @Override + public boolean containsWindow(Window window) + { + return windowToDataMap.containsKey(window); + } + + @Override + public long size() + { + return windowToDataMap.size(); + } + + @Override + public void remove(Window window) + { + windowToDataMap.remove(window); + } + + @Override + public void setup(Context.OperatorContext context) + { + if (bucket == 0) { + // choose a bucket that is almost guaranteed to be unique + bucket = (context.getValue(Context.DAGContext.APPLICATION_NAME) + "#" + context.getId()).hashCode(); + } + // set default serdes + if (windowSerde == null) { + windowSerde = new SerdeKryoSlice<>(); + } + if (valueSerde == null) { + valueSerde = new SerdeKryoSlice<>(); + } + if (windowToDataMap == null) { + windowToDataMap = scc.newSpillableByteMap(bucket, windowSerde, valueSerde); + } + } + + @Override + public void teardown() + { + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/test/java/com/datatorrent/lib/helper/OperatorContextTestHelper.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/helper/OperatorContextTestHelper.java b/library/src/test/java/com/datatorrent/lib/helper/OperatorContextTestHelper.java index 2ece6b2..8fa814d 100644 --- a/library/src/test/java/com/datatorrent/lib/helper/OperatorContextTestHelper.java +++ b/library/src/test/java/com/datatorrent/lib/helper/OperatorContextTestHelper.java @@ -60,6 +60,11 @@ public class OperatorContextTestHelper this.attributes = map; } + public com.datatorrent.api.Attribute.AttributeMap getAttributes() + { + return attributes; + } + @Override public int getId() { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java new file mode 100644 index 0000000..3b7789c --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponentImpl; +import org.apache.apex.malhar.lib.state.spillable.SpillableTestUtils; +import org.apache.apex.malhar.lib.window.impl.SpillableWindowedKeyedStorage; +import org.apache.apex.malhar.lib.window.impl.SpillableWindowedPlainStorage; + +import com.datatorrent.api.Context; +import com.datatorrent.lib.util.KryoCloneUtils; + +/** + * Unit tests for Spillable Windowed Storage + */ +public class SpillableWindowedStorageTest +{ + @Rule + public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta(); + + @Test + public void testWindowedPlainStorage() + { + SpillableComplexComponentImpl sccImpl = new SpillableComplexComponentImpl(testMeta.store); + SpillableWindowedPlainStorage<Integer> storage = new SpillableWindowedPlainStorage<>(); + Window window1 = new Window.TimeWindow<>(1000, 10); + Window window2 = new Window.TimeWindow<>(1010, 10); + Window window3 = new Window.TimeWindow<>(1020, 10); + storage.setSpillableComplexComponent(sccImpl); + storage.getSpillableComplexComponent().setup(testMeta.operatorContext); + storage.setup(testMeta.operatorContext); + + sccImpl.beginWindow(1000); + storage.put(window1, 1); + storage.put(window2, 2); + storage.put(window3, 3); + sccImpl.endWindow(); + sccImpl.beginWindow(1001); + storage.put(window1, 4); + storage.put(window2, 5); + sccImpl.endWindow(); + sccImpl.beforeCheckpoint(1001); + SpillableWindowedPlainStorage<Integer> clonedStorage = KryoCloneUtils.cloneObject(storage); + sccImpl.checkpointed(1001); + + + sccImpl.beginWindow(1002); + storage.put(window1, 6); + storage.put(window2, 7); + sccImpl.endWindow(); + + Assert.assertEquals(6L, storage.get(window1).longValue()); + Assert.assertEquals(7L, storage.get(window2).longValue()); + Assert.assertEquals(3L, storage.get(window3).longValue()); + + sccImpl.beginWindow(1003); + storage.put(window1, 8); + storage.put(window2, 9); + sccImpl.endWindow(); + + // simulating crash here + storage.teardown(); + storage.getSpillableComplexComponent().teardown(); + + storage = clonedStorage; + testMeta.operatorContext.getAttributes().put(Context.OperatorContext.ACTIVATION_WINDOW_ID, 1001L); + storage.getSpillableComplexComponent().setup(testMeta.operatorContext); + storage.setup(testMeta.operatorContext); + + // recovery at window 1002 + sccImpl.beginWindow(1002); + Assert.assertEquals(4L, storage.get(window1).longValue()); + Assert.assertEquals(5L, storage.get(window2).longValue()); + Assert.assertEquals(3L, storage.get(window3).longValue()); + } + + @Test + public void testWindowedKeyedStorage() + { + SpillableComplexComponentImpl sccImpl = new SpillableComplexComponentImpl(testMeta.store); + SpillableWindowedKeyedStorage<String, Integer> storage = new SpillableWindowedKeyedStorage<>(); + Window window1 = new Window.TimeWindow<>(1000, 10); + Window window2 = new Window.TimeWindow<>(1010, 10); + Window window3 = new Window.TimeWindow<>(1020, 10); + storage.setSpillableComplexComponent(sccImpl); + storage.getSpillableComplexComponent().setup(testMeta.operatorContext); + storage.setup(testMeta.operatorContext); + + sccImpl.beginWindow(1000); + storage.put(window1, "x", 1); + storage.put(window2, "x", 2); + storage.put(window3, "x", 3); + sccImpl.endWindow(); + sccImpl.beginWindow(1001); + storage.put(window1, "x", 4); + storage.put(window2, "x", 5); + sccImpl.endWindow(); + sccImpl.beforeCheckpoint(1001); + SpillableWindowedKeyedStorage<String, Integer> clonedStorage = KryoCloneUtils.cloneObject(storage); + sccImpl.checkpointed(1001); + + sccImpl.beginWindow(1002); + storage.put(window1, "x", 6); + storage.put(window2, "x", 7); + storage.put(window2, "y", 8); + sccImpl.endWindow(); + + Assert.assertEquals(6L, storage.get(window1, "x").longValue()); + Assert.assertEquals(7L, storage.get(window2, "x").longValue()); + Assert.assertEquals(3L, storage.get(window3, "x").longValue()); + Assert.assertEquals(8L, storage.get(window2, "y").longValue()); + + // simulating crash here + storage.teardown(); + storage.getSpillableComplexComponent().teardown(); + + storage = clonedStorage; + testMeta.operatorContext.getAttributes().put(Context.OperatorContext.ACTIVATION_WINDOW_ID, 1001L); + storage.getSpillableComplexComponent().setup(testMeta.operatorContext); + storage.setup(testMeta.operatorContext); + + // recovery at window 1002 + sccImpl.beginWindow(1002); + Assert.assertEquals(4L, storage.get(window1, "x").longValue()); + Assert.assertEquals(5L, storage.get(window2, "x").longValue()); + Assert.assertEquals(3L, storage.get(window3, "x").longValue()); + Assert.assertNull(storage.get(window2, "y")); + + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java index 7396994..4edbcd0 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java @@ -18,35 +18,64 @@ */ package org.apache.apex.malhar.lib.window; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; +import java.util.TreeMap; import javax.validation.ValidationException; import org.joda.time.Duration; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponentImpl; +import org.apache.apex.malhar.lib.state.spillable.SpillableTestUtils; import org.apache.apex.malhar.lib.window.impl.InMemorySessionWindowedStorage; import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedKeyedStorage; import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedStorage; import org.apache.apex.malhar.lib.window.impl.KeyedWindowedOperatorImpl; +import org.apache.apex.malhar.lib.window.impl.SpillableSessionWindowedStorage; +import org.apache.apex.malhar.lib.window.impl.SpillableWindowedKeyedStorage; +import org.apache.apex.malhar.lib.window.impl.SpillableWindowedPlainStorage; import org.apache.apex.malhar.lib.window.impl.WatermarkImpl; import org.apache.apex.malhar.lib.window.impl.WindowedOperatorImpl; import org.apache.commons.lang3.mutable.MutableLong; -import com.datatorrent.api.Attribute; import com.datatorrent.api.Sink; -import com.datatorrent.lib.helper.OperatorContextTestHelper; import com.datatorrent.lib.testbench.CollectorTestSink; import com.datatorrent.lib.util.KeyValPair; /** * Unit tests for WindowedOperator */ +@RunWith(Parameterized.class) public class WindowedOperatorTest { + @Parameterized.Parameters + public static Collection<Object[]> testParameters() + { + return Arrays.asList(new Object[][]{{false}, {true}}); + } + + @Parameterized.Parameter + public Boolean useSpillable; + + private WindowedStorage.WindowedPlainStorage<WindowState> windowStateStorage; + private WindowedStorage.WindowedPlainStorage<MutableLong> plainDataStorage; + private WindowedStorage.WindowedPlainStorage<Long> plainRetractionStorage; + private WindowedStorage.WindowedKeyedStorage<String, MutableLong> keyedDataStorage; + private WindowedStorage.WindowedKeyedStorage<String, Long> keyedRetractionStorage; + private SpillableComplexComponentImpl sccImpl; + + @Rule + public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta(); + private void verifyValidationFailure(WindowedOperatorImpl windowedOperator, String message) { try { @@ -60,19 +89,61 @@ public class WindowedOperatorTest private WindowedOperatorImpl<Long, MutableLong, Long> createDefaultWindowedOperator() { WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = new WindowedOperatorImpl<>(); - windowedOperator.setDataStorage(new InMemoryWindowedStorage<MutableLong>()); - windowedOperator.setRetractionStorage(new InMemoryWindowedStorage<Long>()); - windowedOperator.setWindowStateStorage(new InMemoryWindowedStorage<WindowState>()); + if (useSpillable) { + sccImpl = new SpillableComplexComponentImpl(testMeta.store); + // TODO: We don't yet support Spillable data structures for window state storage because SpillableByteMapImpl does not yet support iterating over all keys. + windowStateStorage = new InMemoryWindowedStorage<>(); + SpillableWindowedPlainStorage<MutableLong> pds = new SpillableWindowedPlainStorage<>(); + pds.setSpillableComplexComponent(sccImpl); + plainDataStorage = pds; + SpillableWindowedPlainStorage<Long> prs = new SpillableWindowedPlainStorage<>(); + prs.setSpillableComplexComponent(sccImpl); + plainRetractionStorage = prs; + windowedOperator.addComponent("SpillableComplexComponent", sccImpl); + } else { + windowStateStorage = new InMemoryWindowedStorage<>(); + plainDataStorage = new InMemoryWindowedStorage<>(); + plainRetractionStorage = new InMemoryWindowedStorage<>(); + } + windowedOperator.setDataStorage(plainDataStorage); + windowedOperator.setRetractionStorage(plainRetractionStorage); + windowedOperator.setWindowStateStorage(windowStateStorage); windowedOperator.setAccumulation(new SumAccumulation()); return windowedOperator; } - private KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> createDefaultKeyedWindowedOperator() + private KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> createDefaultKeyedWindowedOperator(boolean forSession) { KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> windowedOperator = new KeyedWindowedOperatorImpl<>(); - windowedOperator.setDataStorage(new InMemoryWindowedKeyedStorage<String, MutableLong>()); - windowedOperator.setRetractionStorage(new InMemoryWindowedKeyedStorage<String, Long>()); - windowedOperator.setWindowStateStorage(new InMemoryWindowedStorage<WindowState>()); + if (useSpillable) { + sccImpl = new SpillableComplexComponentImpl(testMeta.store); + // TODO: We don't yet support Spillable data structures for window state storage because SpillableByteMapImpl does not yet support iterating over all keys. + windowStateStorage = new InMemoryWindowedStorage<>(); + if (forSession) { + SpillableSessionWindowedStorage<String, MutableLong> sws = new SpillableSessionWindowedStorage<>(); + sws.setSpillableComplexComponent(sccImpl); + keyedDataStorage = sws; + } else { + SpillableWindowedKeyedStorage<String, MutableLong> kds = new SpillableWindowedKeyedStorage<>(); + kds.setSpillableComplexComponent(sccImpl); + keyedDataStorage = kds; + } + SpillableWindowedKeyedStorage<String, Long> krs = new SpillableWindowedKeyedStorage<>(); + krs.setSpillableComplexComponent(sccImpl); + keyedRetractionStorage = krs; + windowedOperator.addComponent("SpillableComplexComponent", sccImpl); + } else { + windowStateStorage = new InMemoryWindowedStorage<>(); + if (forSession) { + keyedDataStorage = new InMemorySessionWindowedStorage<>(); + } else { + keyedDataStorage = new InMemoryWindowedKeyedStorage<>(); + } + keyedRetractionStorage = new InMemoryWindowedKeyedStorage<>(); + } + windowedOperator.setDataStorage(keyedDataStorage); + windowedOperator.setRetractionStorage(keyedRetractionStorage); + windowedOperator.setWindowStateStorage(windowStateStorage); windowedOperator.setAccumulation(new SumAccumulation()); return windowedOperator; } @@ -102,8 +173,6 @@ public class WindowedOperatorTest @Test public void testWatermarkAndAllowedLateness() { - OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1, - new Attribute.AttributeMap.DefaultAttributeMap()); WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator(); CollectorTestSink controlSink = new CollectorTestSink(); @@ -112,26 +181,20 @@ public class WindowedOperatorTest windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000))); windowedOperator.setAllowedLateness(Duration.millis(1000)); - WindowedStorage.WindowedPlainStorage<MutableLong> dataStorage = new InMemoryWindowedStorage<>(); - WindowedStorage.WindowedPlainStorage<WindowState> windowStateStorage = new InMemoryWindowedStorage<>(); - - windowedOperator.setDataStorage(dataStorage); - windowedOperator.setWindowStateStorage(windowStateStorage); - - windowedOperator.setup(context); + windowedOperator.setup(testMeta.operatorContext); windowedOperator.beginWindow(1); windowedOperator.processTuple(new Tuple.TimestampedTuple<>(100L, 2L)); - Assert.assertEquals("There should be exactly one window in the storage", 1, dataStorage.size()); + Assert.assertEquals("There should be exactly one window in the storage", 1, plainDataStorage.size()); Assert.assertEquals("There should be exactly one window in the storage", 1, windowStateStorage.size()); Map.Entry<Window, WindowState> entry = windowStateStorage.entries().iterator().next(); Window window = entry.getKey(); WindowState windowState = entry.getValue(); Assert.assertEquals(-1, windowState.watermarkArrivalTime); - Assert.assertEquals(2L, dataStorage.get(window).longValue()); + Assert.assertEquals(2L, plainDataStorage.get(window).longValue()); windowedOperator.processTuple(new Tuple.TimestampedTuple<>(200L, 3L)); - Assert.assertEquals(5L, dataStorage.get(window).longValue()); + Assert.assertEquals(5L, plainDataStorage.get(window).longValue()); windowedOperator.processWatermark(new WatermarkImpl(1200)); windowedOperator.endWindow(); @@ -140,15 +203,16 @@ public class WindowedOperatorTest windowedOperator.beginWindow(2); windowedOperator.processTuple(new Tuple.TimestampedTuple<>(900L, 4L)); - Assert.assertEquals("Late but not too late", 9L, dataStorage.get(window).longValue()); + Assert.assertEquals("Late but not too late", 9L, plainDataStorage.get(window).longValue()); windowedOperator.processWatermark(new WatermarkImpl(3000)); windowedOperator.endWindow(); Assert.assertEquals("We should get two watermark tuples", 2, controlSink.getCount(false)); windowedOperator.beginWindow(3); windowedOperator.processTuple(new Tuple.TimestampedTuple<>(120L, 5L)); // this tuple should be dropped - Assert.assertEquals("The window should be dropped because it's too late", 0, dataStorage.size()); + Assert.assertEquals("The window should be dropped because it's too late", 0, plainDataStorage.size()); Assert.assertEquals("The window should be dropped because it's too late", 0, windowStateStorage.size()); windowedOperator.endWindow(); + windowedOperator.teardown(); } private void testTrigger(TriggerOption.AccumulationMode accumulationMode) @@ -172,9 +236,7 @@ public class WindowedOperatorTest windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000))); CollectorTestSink sink = new CollectorTestSink(); windowedOperator.output.setSink(sink); - OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1, - new Attribute.AttributeMap.DefaultAttributeMap()); - windowedOperator.setup(context); + windowedOperator.setup(testMeta.operatorContext); windowedOperator.beginWindow(1); windowedOperator.processTuple(new Tuple.TimestampedTuple<>(100L, 2L)); windowedOperator.processTuple(new Tuple.TimestampedTuple<>(200L, 3L)); @@ -212,6 +274,7 @@ public class WindowedOperatorTest default: throw new RuntimeException("Unknown accumulation mode: " + accumulationMode); } + windowedOperator.teardown(); } @Test @@ -233,71 +296,77 @@ public class WindowedOperatorTest } @Test + public void testTriggerWithAccumulatingModeFiringAllPanes() + { + testTriggerWithAccumulatingModeHelper(false); + } + + @Test public void testTriggerWithAccumulatingModeFiringOnlyUpdatedPanes() { - for (boolean firingOnlyUpdatedPanes : new boolean[]{true, false}) { - WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator(); - TriggerOption triggerOption = new TriggerOption().withEarlyFiringsAtEvery(Duration.millis(1000)) - .accumulatingFiredPanes(); - if (firingOnlyUpdatedPanes) { - triggerOption.firingOnlyUpdatedPanes(); - } - windowedOperator.setTriggerOption(triggerOption); - windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000))); - CollectorTestSink sink = new CollectorTestSink(); - windowedOperator.output.setSink(sink); - OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1, - new Attribute.AttributeMap.DefaultAttributeMap()); - windowedOperator.setup(context); - windowedOperator.beginWindow(1); - windowedOperator.processTuple(new Tuple.TimestampedTuple<>(100L, 2L)); - windowedOperator.processTuple(new Tuple.TimestampedTuple<>(200L, 3L)); - windowedOperator.endWindow(); - Assert.assertTrue("No trigger should be fired yet", sink.collectedTuples.isEmpty()); - windowedOperator.beginWindow(2); - windowedOperator.endWindow(); - Assert.assertTrue("No trigger should be fired yet", sink.collectedTuples.isEmpty()); - windowedOperator.beginWindow(3); - windowedOperator.endWindow(); + testTriggerWithAccumulatingModeHelper(true); + } + + public void testTriggerWithAccumulatingModeHelper(boolean firingOnlyUpdatedPanes) + { + WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator(); + TriggerOption triggerOption = new TriggerOption().withEarlyFiringsAtEvery(Duration.millis(1000)) + .accumulatingFiredPanes(); + if (firingOnlyUpdatedPanes) { + triggerOption.firingOnlyUpdatedPanes(); + } + windowedOperator.setTriggerOption(triggerOption); + windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000))); + CollectorTestSink sink = new CollectorTestSink(); + windowedOperator.output.setSink(sink); + windowedOperator.setup(testMeta.operatorContext); + windowedOperator.beginWindow(1); + windowedOperator.processTuple(new Tuple.TimestampedTuple<>(100L, 2L)); + windowedOperator.processTuple(new Tuple.TimestampedTuple<>(200L, 3L)); + windowedOperator.endWindow(); + Assert.assertTrue("No trigger should be fired yet", sink.collectedTuples.isEmpty()); + windowedOperator.beginWindow(2); + windowedOperator.endWindow(); + Assert.assertTrue("No trigger should be fired yet", sink.collectedTuples.isEmpty()); + windowedOperator.beginWindow(3); + windowedOperator.endWindow(); + Assert.assertEquals("There should be exactly one tuple for the time trigger", 1, sink.collectedTuples.size()); + Assert.assertEquals(5L, ((Tuple<Long>)sink.collectedTuples.get(0)).getValue().longValue()); + sink.collectedTuples.clear(); + windowedOperator.beginWindow(4); + windowedOperator.endWindow(); + Assert.assertTrue("No trigger should be fired yet", sink.collectedTuples.isEmpty()); + windowedOperator.beginWindow(5); + windowedOperator.endWindow(); + if (firingOnlyUpdatedPanes) { + Assert.assertTrue("There should not be any trigger since no panes have been updated", sink.collectedTuples + .isEmpty()); + } else { Assert.assertEquals("There should be exactly one tuple for the time trigger", 1, sink.collectedTuples.size()); Assert.assertEquals(5L, ((Tuple<Long>)sink.collectedTuples.get(0)).getValue().longValue()); - sink.collectedTuples.clear(); - windowedOperator.beginWindow(4); - windowedOperator.endWindow(); - Assert.assertTrue("No trigger should be fired yet", sink.collectedTuples.isEmpty()); - windowedOperator.beginWindow(5); - windowedOperator.endWindow(); - if (firingOnlyUpdatedPanes) { - Assert.assertTrue("There should not be any trigger since no panes have been updated", sink.collectedTuples.isEmpty()); - } else { - Assert.assertEquals("There should be exactly one tuple for the time trigger", 1, sink.collectedTuples.size()); - Assert.assertEquals(5L, ((Tuple<Long>)sink.collectedTuples.get(0)).getValue().longValue()); - } } + windowedOperator.teardown(); } @Test public void testGlobalWindowAssignment() { - OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1, - new Attribute.AttributeMap.DefaultAttributeMap()); WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator(); windowedOperator.setWindowOption(new WindowOption.GlobalWindow()); - windowedOperator.setup(context); + windowedOperator.setup(testMeta.operatorContext); Tuple.WindowedTuple<Long> windowedValue = windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(1100L, 2L)); Collection<? extends Window> windows = windowedValue.getWindows(); Assert.assertEquals(1, windows.size()); Assert.assertEquals(Window.GlobalWindow.INSTANCE, windows.iterator().next()); + windowedOperator.teardown(); } @Test public void testTimeWindowAssignment() { - OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1, - new Attribute.AttributeMap.DefaultAttributeMap()); WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator(); windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000))); - windowedOperator.setup(context); + windowedOperator.setup(testMeta.operatorContext); Tuple.WindowedTuple<Long> windowedValue = windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(1100L, 2L)); Collection<? extends Window> windows = windowedValue.getWindows(); Assert.assertEquals(1, windows.size()); @@ -309,11 +378,9 @@ public class WindowedOperatorTest @Test public void testSlidingWindowAssignment() { - OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1, - new Attribute.AttributeMap.DefaultAttributeMap()); WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator(); windowedOperator.setWindowOption(new WindowOption.SlidingTimeWindows(Duration.millis(1000), Duration.millis(200))); - windowedOperator.setup(context); + windowedOperator.setup(testMeta.operatorContext); Tuple.WindowedTuple<Long> windowedValue = windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(1600L, 2L)); Collection<? extends Window> windows = windowedValue.getWindows(); Window[] winArray = windows.toArray(new Window[]{}); @@ -328,21 +395,18 @@ public class WindowedOperatorTest Assert.assertEquals(1000, winArray[3].getDurationMillis()); Assert.assertEquals(1600, winArray[4].getBeginTimestamp()); Assert.assertEquals(1000, winArray[4].getDurationMillis()); + windowedOperator.teardown(); } @Test public void testSessionWindows() { - OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1, - new Attribute.AttributeMap.DefaultAttributeMap()); - KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> windowedOperator = createDefaultKeyedWindowedOperator(); - windowedOperator.setDataStorage(new InMemorySessionWindowedStorage<String, MutableLong>()); - windowedOperator.setRetractionStorage(new InMemorySessionWindowedStorage<String, Long>()); + KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> windowedOperator = createDefaultKeyedWindowedOperator(true); windowedOperator.setWindowOption(new WindowOption.SessionWindows(Duration.millis(2000))); windowedOperator.setTriggerOption(new TriggerOption().withEarlyFiringsAtEvery(1).accumulatingAndRetractingFiredPanes().firingOnlyUpdatedPanes()); CollectorTestSink<Tuple<KeyValPair<String, Long>>> sink = new CollectorTestSink(); windowedOperator.output.setSink((Sink<Object>)(Sink)sink); - windowedOperator.setup(context); + windowedOperator.setup(testMeta.operatorContext); windowedOperator.beginWindow(1); Tuple<KeyValPair<String, Long>> tuple = new Tuple.TimestampedTuple<>(1100L, new KeyValPair<>("a", 2L)); windowedOperator.processTuple(tuple); @@ -400,16 +464,24 @@ public class WindowedOperatorTest Assert.assertEquals(3, sink.getCount(false)); // retraction of the two old windows + Map<Window, KeyValPair<String, Long>> tuples = new TreeMap<>(); out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(0); Assert.assertEquals(1, out.getWindows().size()); - Assert.assertEquals(window2, out.getWindows().iterator().next()); - Assert.assertEquals("a", out.getValue().getKey()); - Assert.assertEquals(-5L, out.getValue().getValue().longValue()); + tuples.put(out.getWindows().iterator().next(), out.getValue()); + out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(1); Assert.assertEquals(1, out.getWindows().size()); - Assert.assertEquals(window3, out.getWindows().iterator().next()); - Assert.assertEquals("a", out.getValue().getKey()); - Assert.assertEquals(-4L, out.getValue().getValue().longValue()); + tuples.put(out.getWindows().iterator().next(), out.getValue()); + + Iterator<Map.Entry<Window, KeyValPair<String, Long>>> iterator = tuples.entrySet().iterator(); + Map.Entry<Window, KeyValPair<String, Long>> entry = iterator.next(); + Assert.assertEquals(window2, entry.getKey()); + Assert.assertEquals("a", entry.getValue().getKey()); + Assert.assertEquals(-5L, entry.getValue().getValue().longValue()); + entry = iterator.next(); + Assert.assertEquals(window3, entry.getKey()); + Assert.assertEquals("a", entry.getValue().getKey()); + Assert.assertEquals(-4L, entry.getValue().getValue().longValue()); // normal trigger out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(2); @@ -421,32 +493,30 @@ public class WindowedOperatorTest Assert.assertEquals(12L, out.getValue().getValue().longValue()); windowedOperator.endWindow(); + windowedOperator.teardown(); } @Test public void testKeyedAccumulation() { - OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1, - new Attribute.AttributeMap.DefaultAttributeMap()); - KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> windowedOperator = createDefaultKeyedWindowedOperator(); + KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> windowedOperator = createDefaultKeyedWindowedOperator(false); windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000))); - WindowedStorage.WindowedKeyedStorage<String, MutableLong> dataStorage = new InMemoryWindowedKeyedStorage<>(); - windowedOperator.setDataStorage(dataStorage); - windowedOperator.setup(context); + windowedOperator.setup(testMeta.operatorContext); windowedOperator.beginWindow(1); windowedOperator.processTuple(new Tuple.TimestampedTuple<>(100L, new KeyValPair<>("a", 2L))); windowedOperator.processTuple(new Tuple.TimestampedTuple<>(200L, new KeyValPair<>("a", 3L))); windowedOperator.processTuple(new Tuple.TimestampedTuple<>(300L, new KeyValPair<>("b", 4L))); windowedOperator.processTuple(new Tuple.TimestampedTuple<>(150L, new KeyValPair<>("b", 5L))); windowedOperator.endWindow(); - Assert.assertEquals(1, dataStorage.size()); - Assert.assertEquals(5L, dataStorage.get(new Window.TimeWindow(0, 1000), "a").longValue()); - Assert.assertEquals(9L, dataStorage.get(new Window.TimeWindow(0, 1000), "b").longValue()); + Assert.assertEquals(1, keyedDataStorage.size()); + Assert.assertEquals(5L, keyedDataStorage.get(new Window.TimeWindow(0, 1000), "a").longValue()); + Assert.assertEquals(9L, keyedDataStorage.get(new Window.TimeWindow(0, 1000), "b").longValue()); + windowedOperator.teardown(); } private void testKeyedTrigger(TriggerOption.AccumulationMode accumulationMode) { - KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> windowedOperator = createDefaultKeyedWindowedOperator(); + KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> windowedOperator = createDefaultKeyedWindowedOperator(false); TriggerOption triggerOption = new TriggerOption().withEarlyFiringsAtEvery(Duration.millis(1000)); switch (accumulationMode) { case ACCUMULATING: @@ -465,9 +535,7 @@ public class WindowedOperatorTest windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000))); CollectorTestSink<Tuple<KeyValPair<String, Long>>> sink = new CollectorTestSink(); windowedOperator.output.setSink((Sink<Object>)(Sink)sink); - OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1, - new Attribute.AttributeMap.DefaultAttributeMap()); - windowedOperator.setup(context); + windowedOperator.setup(testMeta.operatorContext); windowedOperator.beginWindow(1); windowedOperator.processTuple(new Tuple.TimestampedTuple<>(100L, new KeyValPair<>("a", 2L))); windowedOperator.processTuple(new Tuple.TimestampedTuple<>(200L, new KeyValPair<>("b", 3L))); @@ -530,6 +598,7 @@ public class WindowedOperatorTest default: throw new RuntimeException("Unknown accumulation mode: " + accumulationMode); } + windowedOperator.teardown(); } @Test
