Repository: apex-malhar Updated Branches: refs/heads/master b811e3356 -> 9f9da0ee1
APEXMALHAR-2130 WindowStorage interface changes in preparation of incorporating spillable data structures Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/9f9da0ee Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/9f9da0ee Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/9f9da0ee Branch: refs/heads/master Commit: 9f9da0ee15e00b51d57725c75119c145697bfd64 Parents: b811e33 Author: David Yan <[email protected]> Authored: Sun Sep 11 00:30:20 2016 -0700 Committer: David Yan <[email protected]> Committed: Wed Sep 14 11:24:56 2016 -0700 ---------------------------------------------------------------------- .../stream/sample/complete/AutoComplete.java | 2 +- .../sample/complete/TopWikipediaSessions.java | 14 +- .../sample/complete/TwitterAutoComplete.java | 2 +- .../sample/cookbook/MaxPerKeyExamples.java | 2 +- .../apex/malhar/lib/window/Accumulation.java | 2 +- .../lib/window/SessionWindowedStorage.java | 11 +- .../apache/apex/malhar/lib/window/Tuple.java | 70 +++++++++- .../apache/apex/malhar/lib/window/Window.java | 125 +++++++++++------ .../apex/malhar/lib/window/WindowOption.java | 4 +- .../malhar/lib/window/WindowedKeyedStorage.java | 81 ----------- .../malhar/lib/window/WindowedOperator.java | 7 - .../apex/malhar/lib/window/WindowedStorage.java | 98 +++++++++----- .../window/impl/AbstractWindowedOperator.java | 135 ++++++++++++++----- .../impl/InMemorySessionWindowedStorage.java | 87 ++++++++++++ .../impl/InMemoryWindowedKeyedStorage.java | 42 +----- .../window/impl/InMemoryWindowedStorage.java | 22 ++- .../window/impl/KeyedWindowedOperatorImpl.java | 14 +- .../lib/window/impl/WindowedOperatorImpl.java | 2 +- .../malhar/lib/window/WindowedOperatorTest.java | 44 +++--- .../apex/malhar/stream/api/util/TupleUtil.java | 13 +- 20 files changed, 481 insertions(+), 296 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java index 6b208aa..2db59b6 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java @@ -208,7 +208,7 @@ public class AutoComplete implements StreamingApplication public Tuple<KeyValPair<String, CompletionCandidate>> f(KeyValPair<String, CompletionCandidate> tuple) { // TODO: Should be removed after Auto-wrapping is supported. - return new Tuple.WindowedTuple<>(Window.GLOBAL_WINDOW, tuple); + return new Tuple.WindowedTuple<>(Window.GlobalWindow.INSTANCE, tuple); } }); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java index 5ac3e7f..68ec733 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java @@ -28,6 +28,7 @@ import org.joda.time.Duration; import org.apache.apex.malhar.lib.window.TriggerOption; import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.Window; import org.apache.apex.malhar.lib.window.WindowOption; import org.apache.apex.malhar.lib.window.accumulation.TopN; import org.apache.apex.malhar.stream.api.ApexStream; @@ -269,7 +270,8 @@ public class TopWikipediaSessions implements StreamingApplication @Override public TempWrapper f(Tuple.WindowedTuple<KeyValPair<String, Long>> input) { - return new TempWrapper(input.getValue(), input.getWindows().get(0).getBeginTimestamp()); + Window window = input.getWindows().iterator().next(); + return new TempWrapper(input.getValue(), window.getBeginTimestamp()); } }, name("TempWrapper")) @@ -290,14 +292,15 @@ public class TopWikipediaSessions implements StreamingApplication @Override public Tuple.WindowedTuple<KeyValPair<String, Long>> f(Tuple.WindowedTuple<KeyValPair<String, Long>> input) { - return new Tuple.WindowedTuple<KeyValPair<String, Long>>(input.getWindows().get(0), new KeyValPair<String, Long>( - input.getValue().getKey() + " : " + input.getWindows().get(0).getBeginTimestamp() + " : " + input.getWindows().get(0).getDurationMillis(), + Window window = input.getWindows().iterator().next(); + return new Tuple.WindowedTuple<KeyValPair<String, Long>>(window, new KeyValPair<String, Long>( + input.getValue().getKey() + " : " + window.getBeginTimestamp() + " : " + window.getDurationMillis(), input.getValue().getValue())); } } /** - * A flapmap function that turns the result into readable format. + * A flatmap function that turns the result into readable format. */ static class FormatOutputDoFn implements Function.FlatMapFunction<Tuple.WindowedTuple<List<TempWrapper>>, String> { @@ -308,7 +311,8 @@ public class TopWikipediaSessions implements StreamingApplication for (TempWrapper item : input.getValue()) { String session = item.getValue().getKey(); long count = item.getValue().getValue(); - result.add(session + " + " + count + " : " + input.getWindows().get(0).getBeginTimestamp()); + Window window = input.getWindows().iterator().next(); + result.add(session + " + " + count + " : " + window.getBeginTimestamp()); } return result; } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java index ffd2a03..4fc80ea 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java @@ -125,7 +125,7 @@ public class TwitterAutoComplete implements StreamingApplication public Tuple<KeyValPair<String, CompletionCandidate>> f(KeyValPair<String, CompletionCandidate> tuple) { // TODO: Should be removed after Auto-wrapping is supported. - return new Tuple.WindowedTuple<>(Window.GLOBAL_WINDOW, tuple); + return new Tuple.WindowedTuple<>(Window.GlobalWindow.INSTANCE, tuple); } }, name("TopNByKey")); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java index 4fafa5a..9fd9495 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java @@ -110,7 +110,7 @@ public class MaxPerKeyExamples implements StreamingApplication @Override public Tuple<KeyValPair<Integer, Double>> f(KeyValPair<Integer, Double> input) { - return new Tuple.WindowedTuple<KeyValPair<Integer, Double>>(Window.GLOBAL_WINDOW, input); + return new Tuple.WindowedTuple<KeyValPair<Integer, Double>>(Window.GlobalWindow.INSTANCE, input); } }, name("MaxPerMonth")); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/library/src/main/java/org/apache/apex/malhar/lib/window/Accumulation.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/Accumulation.java b/library/src/main/java/org/apache/apex/malhar/lib/window/Accumulation.java index 89215a1..03f7ff7 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/Accumulation.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/Accumulation.java @@ -51,7 +51,7 @@ public interface Accumulation<InputT, AccumT, OutputT> AccumT accumulate(AccumT accumulatedValue, InputT input); /** - * Merges two accumulated value into one + * Merges two accumulated values into one * * @param accumulatedValue1 * @param accumulatedValue2 http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java index 404e591..4cb2b1a 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java @@ -32,9 +32,18 @@ import org.apache.hadoop.classification.InterfaceStability; * @since 3.5.0 */ @InterfaceStability.Evolving -public interface SessionWindowedStorage<K, V> extends WindowedKeyedStorage<K, V> +public interface SessionWindowedStorage<K, V> extends WindowedStorage.WindowedKeyedStorage<K, V> { /** + * Migrate the data from one window to another. This will invalidate fromWindow in the storage and move the + * data to toWindow, and overwrite any existing data in toWindow + * + * @param fromWindow + * @param toWindow + */ + void migrateWindow(Window.SessionWindow<K> fromWindow, Window.SessionWindow<K> toWindow); + + /** * Given the key, the timestamp and the gap, gets the data that falls into timestamp +/- gap. * This is used for getting the entry the data given the timestamp belongs to, and for determining whether to merge * session windows. http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/library/src/main/java/org/apache/apex/malhar/lib/window/Tuple.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/Tuple.java b/library/src/main/java/org/apache/apex/malhar/lib/window/Tuple.java index eaf4d29..aea6bf6 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/Tuple.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/Tuple.java @@ -18,8 +18,9 @@ */ package org.apache.apex.malhar.lib.window; -import java.util.ArrayList; -import java.util.List; +import java.util.Collection; +import java.util.Set; +import java.util.TreeSet; import org.apache.hadoop.classification.InterfaceStability; @@ -72,6 +73,27 @@ public interface Tuple<T> { return value.toString(); } + + @Override + public int hashCode() + { + return value.hashCode(); + } + + @Override + public boolean equals(Object obj) + { + if (obj instanceof PlainTuple) { + PlainTuple<T> other = (PlainTuple<T>)obj; + if (this.value == null) { + return other.value == null; + } else { + return this.value.equals(other.value); + } + } else { + return false; + } + } } /** @@ -103,6 +125,23 @@ public interface Tuple<T> { this.timestamp = timestamp; } + + @Override + public int hashCode() + { + return super.hashCode() ^ (int)(timestamp & 0xffffff); + } + + @Override + public boolean equals(Object obj) + { + if (obj instanceof TimestampedTuple && super.equals(obj)) { + TimestampedTuple<T> other = (TimestampedTuple<T>)obj; + return (this.timestamp == other.timestamp); + } else { + return false; + } + } } /** @@ -112,7 +151,7 @@ public interface Tuple<T> */ class WindowedTuple<T> extends TimestampedTuple<T> { - private List<Window> windows = new ArrayList<>(); + private Set<Window> windows = new TreeSet<>(); public WindowedTuple() { @@ -124,7 +163,13 @@ public interface Tuple<T> this.windows.add(window); } - public List<Window> getWindows() + public WindowedTuple(Collection<? extends Window> windows, long timestamp, T value) + { + super(timestamp, value); + this.windows.addAll(windows); + } + + public Collection<Window> getWindows() { return windows; } @@ -133,6 +178,23 @@ public interface Tuple<T> { this.windows.add(window); } + + @Override + public int hashCode() + { + return super.hashCode() ^ windows.hashCode(); + } + + @Override + public boolean equals(Object obj) + { + if (obj instanceof WindowedTuple && super.equals(obj)) { + WindowedTuple<T> other = (WindowedTuple<T>)obj; + return this.windows.equals(other.windows); + } else { + return false; + } + } } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java b/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java index 32a028a..50d6445 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java @@ -18,27 +18,33 @@ */ package org.apache.apex.malhar.lib.window; -import java.util.Comparator; - import org.apache.hadoop.classification.InterfaceStability; /** - * This interface describes the individual window. + * This interface describes the classes that represent individual windows. + * + * @param <WINDOW> window type the object of this class can call compareTo * * @since 3.5.0 */ @InterfaceStability.Evolving -public interface Window +public interface Window<WINDOW extends Comparable<WINDOW>> extends Comparable<WINDOW> { long getBeginTimestamp(); long getDurationMillis(); /** - * Global window means there is only one window, or no window depending on how you look at it. + * Global window means there is only one window that spans the entire life time of the application */ - class GlobalWindow implements Window + class GlobalWindow implements Window<GlobalWindow> { + + /** + * The singleton global window + */ + public static final GlobalWindow INSTANCE = new GlobalWindow(); + private GlobalWindow() { } @@ -57,56 +63,37 @@ public interface Window @Override public boolean equals(Object other) { - return (other instanceof GlobalWindow); + return this == other; } - } - class DefaultComparator implements Comparator<Window> - { - private DefaultComparator() + @Override + public int compareTo(GlobalWindow o) { + return 0; } @Override - public int compare(Window o1, Window o2) + public String toString() { - if (o1.getBeginTimestamp() < o2.getBeginTimestamp()) { - return -1; - } else if (o1.getBeginTimestamp() > o2.getBeginTimestamp()) { - return 1; - } else if (o1.getDurationMillis() < o2.getDurationMillis()) { - return -1; - } else if (o1.getDurationMillis() > o2.getDurationMillis()) { - return 1; - } else if (o1 instanceof SessionWindow && o2 instanceof SessionWindow) { - return Long.compare(((SessionWindow)o1).getKey().hashCode(), ((SessionWindow)o2).getKey().hashCode()); - } else { - return 0; - } + return "[GlobalWindow]"; } } /** - * The singleton global window - */ - GlobalWindow GLOBAL_WINDOW = new GlobalWindow(); - - /** - * The singleton default comparator of windows - */ - Comparator<Window> DEFAULT_COMPARATOR = new DefaultComparator(); - - /** * TimeWindow is a window that represents a time slice + * + * @param <WINDOW> window type the object of this class can call compareTo */ - class TimeWindow implements Window + class TimeWindow<WINDOW extends TimeWindow<WINDOW>> implements Window<WINDOW> { - protected long beginTimestamp; - protected long durationMillis; + protected final long beginTimestamp; + protected final long durationMillis; private TimeWindow() { // for kryo + this.beginTimestamp = -1; + this.durationMillis = 0; } public TimeWindow(long beginTimestamp, long durationMillis) @@ -148,20 +135,49 @@ public interface Window } } + @Override + public int hashCode() + { + int result = (int)(beginTimestamp ^ (beginTimestamp >>> 32)); + result = 31 * result + (int)(durationMillis ^ (durationMillis >>> 32)); + return result; + } + + @Override + public int compareTo(TimeWindow o) + { + if (this.getBeginTimestamp() < o.getBeginTimestamp()) { + return -1; + } else if (this.getBeginTimestamp() > o.getBeginTimestamp()) { + return 1; + } else if (this.getDurationMillis() < o.getDurationMillis()) { + return -1; + } else if (this.getDurationMillis() > o.getDurationMillis()) { + return 1; + } + return 0; + } + + @Override + public String toString() + { + return "[TimeWindow " + getBeginTimestamp() + "(" + getDurationMillis() + ")]"; + } } /** * SessionWindow is a window that represents a time slice for a key, with the time slice being variable length. * - * @param <K> + * @param <K> the key type for the session */ - class SessionWindow<K> extends TimeWindow + class SessionWindow<K> extends TimeWindow<SessionWindow<K>> { - private K key; + private final K key; private SessionWindow() { // for kryo + this.key = null; } public SessionWindow(K key, long beginTimestamp, long duration) @@ -192,5 +208,32 @@ public interface Window return false; } } + + @Override + public int hashCode() + { + return (key.hashCode() << 16) | super.hashCode(); + } + + @Override + public int compareTo(SessionWindow<K> o) + { + int val = super.compareTo(o); + if (val == 0) { + if (this.getKey() instanceof Comparable) { + return ((Comparable<K>)this.getKey()).compareTo(o.getKey()); + } else { + return Long.compare(this.getKey().hashCode(), o.getKey().hashCode()); + } + } else { + return val; + } + } + + @Override + public String toString() + { + return "[SessionWindow key=" + getKey() + " " + getBeginTimestamp() + "(" + getDurationMillis() + ")]"; + } } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/library/src/main/java/org/apache/apex/malhar/lib/window/WindowOption.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowOption.java b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowOption.java index de244fb..099709d 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowOption.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowOption.java @@ -63,7 +63,7 @@ public interface WindowOption /** * Gets the duration of the time window * - * @return + * @return the duration of the time window */ public Duration getDuration() { @@ -74,7 +74,7 @@ public interface WindowOption * The time window should be a sliding window with the given slide duration * * @param duration - * @return + * @return the SlidingTimeWindows */ public SlidingTimeWindows slideBy(Duration duration) { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedKeyedStorage.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedKeyedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedKeyedStorage.java deleted file mode 100644 index d59ee40..0000000 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedKeyedStorage.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.window; - -import java.util.Map; - -import org.apache.apex.malhar.lib.state.spillable.Spillable; -import org.apache.hadoop.classification.InterfaceStability; - -/** - * This interface is for a key/value store for storing data for windowed streams. - * The key to this key/value store is a pair of (Window, K). - * Also, this class may go away soon as there are plans to incorporate {@link Spillable} data structures - * in the near future. - * - * Note that this interface expects that the implementation takes care of checkpoint recovery. - * - * - * @since 3.5.0 - */ [email protected] -public interface WindowedKeyedStorage<K, V> extends WindowedStorage<Map<K, V>> -{ - /** - * Sets the data associated with the given window and the key - * - * @param window - * @param key - * @param value - */ - void put(Window window, K key, V value); - - /** - * Gets the key/value pairs associated with the given window - * - * @param window - * @return - */ - Iterable<Map.Entry<K, V>> entrySet(Window window); - - /** - * Gets the data associated with the given window and the key - * - * @param window - * @param key - * @return - */ - V get(Window window, K key); - - /** - * Removes all the data associated with the given window - * - * @param window - */ - void remove(Window window); - - /** - * Migrate the data from one window to another. This will invalidate fromWindow in the storage and move the - * data to toWindow, and overwrite any existing data in toWindow - * - * @param fromWindow - * @param toWindow - */ - void migrateWindow(Window fromWindow, Window toWindow); -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedOperator.java index 5da531c..ccc7ae1 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedOperator.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedOperator.java @@ -61,13 +61,6 @@ public interface WindowedOperator<InputT> void setAllowedLateness(Duration allowedLateness); /** - * This methods sets the storage for the meta data for each window - * - * @param storageAgent - */ - void setWindowStateStorage(WindowedStorage<WindowState> storageAgent); - - /** * This sets the function that extracts the timestamp from the input tuple * * @param timestampExtractor http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java index c2b3f08..42ecdae 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java @@ -20,23 +20,19 @@ package org.apache.apex.malhar.lib.window; import java.util.Map; -import org.apache.apex.malhar.lib.state.spillable.Spillable; import org.apache.hadoop.classification.InterfaceStability; +import com.datatorrent.api.Component; +import com.datatorrent.api.Context; + /** * WindowedStorage is a key-value store with the key being the window. The implementation of this interface should * make sure checkpointing and recovery will be done correctly. - * Note that this interface may go away soon as there are plans to incorporate {@link Spillable} data structures in the - * near future. - * - * @param <T> The type of the data that is stored per window - * - * TODO: Look at the possibility of integrating spillable data structure: https://issues.apache.org/jira/browse/APEXMALHAR-2026 * * @since 3.5.0 */ @InterfaceStability.Unstable -public interface WindowedStorage<T> extends Iterable<Map.Entry<Window, T>> +public interface WindowedStorage extends Component<Context.OperatorContext> { /** * Returns true if the storage contains this window @@ -53,22 +49,6 @@ public interface WindowedStorage<T> extends Iterable<Map.Entry<Window, T>> long size(); /** - * Sets the data associated with the given window - * - * @param window - * @param value - */ - void put(Window window, T value); - - /** - * Gets the value associated with the given window - * - * @param window - * @return - */ - T get(Window window); - - /** * Removes all the data associated with the given window. This does NOT mean removing the window in checkpointed state * * @param window @@ -76,18 +56,72 @@ public interface WindowedStorage<T> extends Iterable<Map.Entry<Window, T>> void remove(Window window); /** - * Migrate the data from one window to another. This will invalidate fromWindow in the storage and move the - * data to toWindow, and overwrite any existing data in toWindow + * This interface handles plain value per window. If there is a key/value map for each window, use + * {@link WindowedKeyedStorage}. Also note that a single T object is assumed to be fit in memory * - * @param fromWindow - * @param toWindow + * Note that this interface expects that the implementation takes care of checkpoint recovery. + * + * @param <T> The type of the data that is stored per window */ - void migrateWindow(Window fromWindow, Window toWindow); + interface WindowedPlainStorage<T> extends WindowedStorage + { + /** + * Sets the data associated with the given window + * + * @param window + * @param value + */ + void put(Window window, T value); + + /** + * Gets the value associated with the given window + * + * @param window + * @return + */ + T get(Window window); + + /** + * Returns the iterable of the entries in the storage + * + * @return + */ + Iterable<Map.Entry<Window, T>> entries(); + } /** - * Returns the iterable of the entries in the storage + * This interface is for a store that maps Windows to maps of key value pairs. + * + * Note that this interface expects that the implementation takes care of checkpoint recovery. * - * @return */ - Iterable<Map.Entry<Window, T>> entrySet(); + interface WindowedKeyedStorage<K, V> extends WindowedStorage + { + /** + * Sets the data associated with the given window and the key + * + * @param window + * @param key + * @param value + */ + void put(Window window, K key, V value); + + /** + * Gets an iterable object over the key/value pairs associated with the given window + * + * @param window + * @return + */ + Iterable<Map.Entry<K, V>> entries(Window window); + + /** + * Gets the data associated with the given window and the key + * + * @param window + * @param key + * @return + */ + V get(Window window, K key); + + } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java index 7abe9b6..f90d47d 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java @@ -19,6 +19,9 @@ package org.apache.apex.malhar.lib.window.impl; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -29,6 +32,7 @@ import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.malhar.lib.state.spillable.WindowListener; import org.apache.apex.malhar.lib.window.Accumulation; import org.apache.apex.malhar.lib.window.ControlTuple; import org.apache.apex.malhar.lib.window.TriggerOption; @@ -42,9 +46,11 @@ import org.apache.hadoop.classification.InterfaceStability; import com.google.common.base.Function; +import com.datatorrent.api.Component; import com.datatorrent.api.Context; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; import com.datatorrent.api.annotation.InputPortFieldAnnotation; import com.datatorrent.common.util.BaseOperator; @@ -63,13 +69,13 @@ import com.datatorrent.common.util.BaseOperator; */ @InterfaceStability.Evolving public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT extends WindowedStorage, RetractionStorageT extends WindowedStorage, AccumulationT extends Accumulation> - extends BaseOperator implements WindowedOperator<InputT> + extends BaseOperator implements WindowedOperator<InputT>, Operator.CheckpointNotificationListener { protected WindowOption windowOption; protected TriggerOption triggerOption; protected long allowedLatenessMillis = -1; - protected WindowedStorage<WindowState> windowStateMap; + protected WindowedStorage.WindowedPlainStorage<WindowState> windowStateMap; private Function<InputT, Long> timestampExtractor; @@ -81,12 +87,16 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext private long lateTriggerCount; private long lateTriggerMillis; private long currentDerivedTimestamp = -1; - private long windowWidthMillis; + private long timeIncrement; private long fixedWatermarkMillis = -1; + + private Map<String, Component<Context.OperatorContext>> components = new HashMap<>(); + protected DataStorageT dataStorage; protected RetractionStorageT retractionStorage; protected AccumulationT accumulation; + private static final transient Collection<? extends Window> GLOBAL_WINDOW_SINGLETON_SET = Collections.singleton(Window.GlobalWindow.INSTANCE); private static final transient Logger LOG = LoggerFactory.getLogger(AbstractWindowedOperator.class); public final transient DefaultInputPort<Tuple<InputT>> input = new DefaultInputPort<Tuple<InputT>>() @@ -156,9 +166,6 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext public void setWindowOption(WindowOption windowOption) { this.windowOption = windowOption; - if (this.windowOption instanceof WindowOption.GlobalWindow) { - windowStateMap.put(Window.GLOBAL_WINDOW, new WindowState()); - } } @Override @@ -216,6 +223,11 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext this.retractionStorage = storageAgent; } + public void addComponent(String key, Component<Context.OperatorContext> component) + { + components.put(key, component); + } + /** * Sets the accumulation, which basically tells the WindowedOperator what to do if a new tuple comes in and what * to put in the pane when a trigger is fired @@ -227,8 +239,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext this.accumulation = accumulation; } - @Override - public void setWindowStateStorage(WindowedStorage<WindowState> storageAgent) + public void setWindowStateStorage(WindowedStorage.WindowedPlainStorage<WindowState> storageAgent) { this.windowStateMap = storageAgent; } @@ -284,12 +295,9 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext if (windowOption == null && input instanceof Tuple.WindowedTuple) { // inherit the windows from upstream return (Tuple.WindowedTuple<InputT>)input; + } else { + return new Tuple.WindowedTuple<>(assignWindows(input), extractTimestamp(input), input.getValue()); } - Tuple.WindowedTuple<InputT> windowedTuple = new Tuple.WindowedTuple<>(); - windowedTuple.setValue(input.getValue()); - windowedTuple.setTimestamp(extractTimestamp(input)); - assignWindows(windowedTuple.getWindows(), input); - return windowedTuple; } private long extractTimestamp(Tuple<InputT> tuple) @@ -305,29 +313,31 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext } } - private void assignWindows(List<Window> windows, Tuple<InputT> inputTuple) + private Collection<? extends Window> assignWindows(Tuple<InputT> inputTuple) { if (windowOption instanceof WindowOption.GlobalWindow) { - windows.add(Window.GLOBAL_WINDOW); + return GLOBAL_WINDOW_SINGLETON_SET; } else { long timestamp = extractTimestamp(inputTuple); if (windowOption instanceof WindowOption.TimeWindows) { - - for (Window.TimeWindow window : getTimeWindowsForTimestamp(timestamp)) { + Collection<? extends Window> windows = getTimeWindowsForTimestamp(timestamp); + for (Window window : windows) { if (!windowStateMap.containsWindow(window)) { windowStateMap.put(window, new WindowState()); } - windows.add(window); } + return windows; } else if (windowOption instanceof WindowOption.SessionWindows) { - assignSessionWindows(windows, timestamp, inputTuple); + return assignSessionWindows(timestamp, inputTuple); + } else { + throw new IllegalStateException("Unsupported Window Option: " + windowOption.getClass()); } } } - protected void assignSessionWindows(List<Window> windows, long timestamp, Tuple<InputT> inputTuple) + protected Collection<Window.SessionWindow> assignSessionWindows(long timestamp, Tuple<InputT> inputTuple) { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("Session window require keyed tuples"); } /** @@ -338,7 +348,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext * @param timestamp * @return */ - private List<Window.TimeWindow> getTimeWindowsForTimestamp(long timestamp) + private Collection<Window.TimeWindow> getTimeWindowsForTimestamp(long timestamp) { List<Window.TimeWindow> windows = new ArrayList<>(); if (windowOption instanceof WindowOption.TimeWindows) { @@ -348,8 +358,6 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext if (windowOption instanceof WindowOption.SlidingTimeWindows) { long slideBy = ((WindowOption.SlidingTimeWindows)windowOption).getSlideByDuration().getMillis(); // add the sliding windows front and back - // Note: this messes up the order of the window and we might want to revisit this if the order of the windows - // matter for (long slideBeginTimestamp = beginTimestamp - slideBy; slideBeginTimestamp <= timestamp && timestamp < slideBeginTimestamp + durationMillis; slideBeginTimestamp -= slideBy) { @@ -389,8 +397,32 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext @Override public void setup(Context.OperatorContext context) { - this.windowWidthMillis = context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS); + this.timeIncrement = context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) * context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS); validate(); + windowStateMap.setup(context); + dataStorage.setup(context); + if (retractionStorage != null) { + retractionStorage.setup(context); + } + for (Component component : components.values()) { + component.setup(context); + } + if (this.windowOption instanceof WindowOption.GlobalWindow) { + windowStateMap.put(Window.GlobalWindow.INSTANCE, new WindowState()); + } + } + + @Override + public void teardown() + { + windowStateMap.teardown(); + dataStorage.teardown(); + if (retractionStorage != null) { + retractionStorage.teardown(); + } + for (Component component : components.values()) { + component.teardown(); + } } /** @@ -399,10 +431,16 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext @Override public void beginWindow(long windowId) { + for (Component component : components.values()) { + if (component instanceof WindowListener) { + ((WindowListener)component).beginWindow(windowId); + } + } if (currentDerivedTimestamp == -1) { - currentDerivedTimestamp = ((windowId >> 32) * 1000) + (windowId & 0xffffffffL); + // TODO: once we are able to get the firstWindowMillis from Apex Core API, we should use that instead + currentDerivedTimestamp = (windowId >> 32) * 1000; } else { - currentDerivedTimestamp += windowWidthMillis; + currentDerivedTimestamp += timeIncrement; } watermarkTimestamp = -1; } @@ -417,6 +455,12 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext // TODO: May want to revisit this if the application cares more about latency than idempotency processWatermarkAtEndWindow(); fireTimeTriggers(); + + for (Component component : components.values()) { + if (component instanceof WindowListener) { + ((WindowListener)component).endWindow(); + } + } } private void processWatermarkAtEndWindow() @@ -429,7 +473,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext long horizon = watermarkTimestamp - allowedLatenessMillis; - for (Iterator<Map.Entry<Window, WindowState>> it = windowStateMap.iterator(); it.hasNext(); ) { + for (Iterator<Map.Entry<Window, WindowState>> it = windowStateMap.entries().iterator(); it.hasNext(); ) { Map.Entry<Window, WindowState> entry = it.next(); Window window = entry.getKey(); WindowState windowState = entry.getValue(); @@ -446,7 +490,9 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext // discard this window because it's too late now it.remove(); dataStorage.remove(window); - retractionStorage.remove(window); + if (retractionStorage != null) { + retractionStorage.remove(window); + } } } } @@ -457,7 +503,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext private void fireTimeTriggers() { if (earlyTriggerMillis > 0 || lateTriggerMillis > 0) { - for (Map.Entry<Window, WindowState> entry : windowStateMap.entrySet()) { + for (Map.Entry<Window, WindowState> entry : windowStateMap.entries()) { Window window = entry.getKey(); WindowState windowState = entry.getValue(); if (windowState.watermarkArrivalTime == -1) { @@ -511,4 +557,33 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext dataStorage.remove(window); } + @Override + public void beforeCheckpoint(long windowId) + { + for (Component component : components.values()) { + if (component instanceof CheckpointNotificationListener) { + ((CheckpointNotificationListener)component).beforeCheckpoint(windowId); + } + } + } + + @Override + public void checkpointed(long windowId) + { + for (Component component : components.values()) { + if (component instanceof CheckpointNotificationListener) { + ((CheckpointNotificationListener)component).checkpointed(windowId); + } + } + } + + @Override + public void committed(long windowId) + { + for (Component component : components.values()) { + if (component instanceof CheckpointNotificationListener) { + ((CheckpointNotificationListener)component).committed(windowId); + } + } + } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java new file mode 100644 index 0000000..fdceb4d --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl; + +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; + +import org.apache.apex.malhar.lib.window.SessionWindowedStorage; +import org.apache.apex.malhar.lib.window.Window; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * This is the in-memory implementation of {@link WindowedKeyedStorage}. Do not use this class if you have a large state that + * can't be fit in memory. + * + * @since 3.5.0 + */ [email protected] +public class InMemorySessionWindowedStorage<K, V> extends InMemoryWindowedKeyedStorage<K, V> + implements SessionWindowedStorage<K, V> +{ + private Map<K, TreeSet<Window.SessionWindow<K>>> keyToWindows = new HashMap<>(); + + @Override + public void put(Window window, K key, V value) + { + super.put(window, key, value); + TreeSet<Window.SessionWindow<K>> sessionWindows = keyToWindows.get(key); + if (sessionWindows == null) { + sessionWindows = new TreeSet<>(); + keyToWindows.put(key, sessionWindows); + } + sessionWindows.add((Window.SessionWindow<K>)window); + } + + @Override + public void migrateWindow(Window.SessionWindow<K> fromWindow, Window.SessionWindow<K> toWindow) + { + if (containsWindow(fromWindow)) { + map.put(toWindow, map.remove(fromWindow)); + } + } + + @Override + public Collection<Map.Entry<Window.SessionWindow<K>, V>> getSessionEntries(K key, long timestamp, long gap) + { + List<Map.Entry<Window.SessionWindow<K>, V>> results = new ArrayList<>(); + TreeSet<Window.SessionWindow<K>> sessionWindows = keyToWindows.get(key); + if (sessionWindows != null) { + Window.SessionWindow<K> refWindow = new Window.SessionWindow<>(key, timestamp, 1); + Window.SessionWindow<K> floor = sessionWindows.floor(refWindow); + if (floor != null) { + if (floor.getBeginTimestamp() + floor.getDurationMillis() + gap > timestamp) { + results.add(new AbstractMap.SimpleEntry<>(floor, map.get(floor).get(key))); + } + } + Window.SessionWindow<K> higher = sessionWindows.higher(refWindow); + if (higher != null) { + if (higher.getBeginTimestamp() - gap <= timestamp) { + results.add(new AbstractMap.SimpleEntry<>(higher, map.get(higher).get(key))); + } + } + } + return results; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedKeyedStorage.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedKeyedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedKeyedStorage.java index 7dbfbb1..4b47edc 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedKeyedStorage.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedKeyedStorage.java @@ -18,31 +18,23 @@ */ package org.apache.apex.malhar.lib.window.impl; -import java.util.AbstractMap; -import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.Set; -import org.apache.apex.malhar.lib.state.spillable.Spillable; -import org.apache.apex.malhar.lib.window.SessionWindowedStorage; import org.apache.apex.malhar.lib.window.Window; -import org.apache.apex.malhar.lib.window.WindowedKeyedStorage; +import org.apache.apex.malhar.lib.window.WindowedStorage; import org.apache.hadoop.classification.InterfaceStability; /** * This is the in-memory implementation of {@link WindowedKeyedStorage}. Do not use this class if you have a large state that - * can't be fit in memory. Also, this class may go away soon as there are plans to incorporate {@link Spillable} data structures - * in the near future. + * can't be fit in memory. * * @since 3.5.0 */ @InterfaceStability.Unstable public class InMemoryWindowedKeyedStorage<K, V> extends InMemoryWindowedStorage<Map<K, V>> - implements WindowedKeyedStorage<K, V>, SessionWindowedStorage<K, V> + implements WindowedStorage.WindowedKeyedStorage<K, V> { @Override public void put(Window window, K key, V value) @@ -51,14 +43,13 @@ public class InMemoryWindowedKeyedStorage<K, V> extends InMemoryWindowedStorage< if (map.containsKey(window)) { kvMap = map.get(window); } else { - kvMap = new HashMap<K, V>(); + kvMap = new HashMap<>(); map.put(window, kvMap); } kvMap.put(key, value); } - @Override - public Set<Map.Entry<K, V>> entrySet(Window window) + public Iterable<Map.Entry<K, V>> entries(Window window) { if (map.containsKey(window)) { return map.get(window).entrySet(); @@ -77,27 +68,4 @@ public class InMemoryWindowedKeyedStorage<K, V> extends InMemoryWindowedStorage< } } - @Override - public Collection<Map.Entry<Window.SessionWindow<K>, V>> getSessionEntries(K key, long timestamp, long gap) - { - List<Map.Entry<Window.SessionWindow<K>, V>> results = new ArrayList<>(); - // TODO: this is inefficient, but this is usually not used in a real use case since it's in memory - for (Map.Entry<Window, Map<K, V>> entry : map.entrySet()) { - Window.SessionWindow<K> window = (Window.SessionWindow<K>)entry.getKey(); - if (key.equals(window.getKey())) { - if (timestamp > window.getBeginTimestamp()) { - if (window.getBeginTimestamp() + window.getDurationMillis() + gap > timestamp) { - results.add(new AbstractMap.SimpleEntry<>(window, entry.getValue().get(key))); - } - } else if (timestamp < window.getBeginTimestamp()) { - if (window.getBeginTimestamp() - gap <= timestamp) { - results.add(new AbstractMap.SimpleEntry<>(window, entry.getValue().get(key))); - } - } else { - results.add(new AbstractMap.SimpleEntry<>(window, entry.getValue().get(key))); - } - } - } - return results; - } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedStorage.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedStorage.java index f6de894..db18a40 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedStorage.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedStorage.java @@ -18,7 +18,6 @@ */ package org.apache.apex.malhar.lib.window.impl; -import java.util.Iterator; import java.util.Map; import java.util.TreeMap; @@ -27,17 +26,19 @@ import org.apache.apex.malhar.lib.window.Window; import org.apache.apex.malhar.lib.window.WindowedStorage; import org.apache.hadoop.classification.InterfaceStability; +import com.datatorrent.api.Context; + /** - * This is the in-memory implementation of {@link WindowedStorage}. Do not use this class if you have a large state that + * This is the in-memory implementation of {@link WindowedPlainStorage}. Do not use this class if you have a large state that * can't be fit in memory. Also, this class may go away soon as there are plans to incorporate {@link Spillable} data * structures in the near future. * * @since 3.5.0 */ @InterfaceStability.Unstable -public class InMemoryWindowedStorage<T> implements WindowedStorage<T> +public class InMemoryWindowedStorage<T> implements WindowedStorage.WindowedPlainStorage<T> { - protected final TreeMap<Window, T> map = new TreeMap<>(Window.DEFAULT_COMPARATOR); + protected final TreeMap<Window, T> map = new TreeMap<>(); @Override public long size() @@ -70,22 +71,19 @@ public class InMemoryWindowedStorage<T> implements WindowedStorage<T> } @Override - public void migrateWindow(Window fromWindow, Window toWindow) + public Iterable<Map.Entry<Window, T>> entries() { - if (containsWindow(fromWindow)) { - map.put(toWindow, map.remove(fromWindow)); - } + return map.entrySet(); } @Override - public Iterable<Map.Entry<Window, T>> entrySet() + public void setup(Context.OperatorContext context) { - return map.entrySet(); } @Override - public Iterator<Map.Entry<Window, T>> iterator() + public void teardown() { - return map.entrySet().iterator(); } + } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java index 7077c96..a38207a 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java @@ -19,8 +19,8 @@ package org.apache.apex.malhar.lib.window.impl; import java.util.Collection; +import java.util.Collections; import java.util.Iterator; -import java.util.List; import java.util.Map; import org.apache.apex.malhar.lib.window.Accumulation; @@ -30,7 +30,7 @@ import org.apache.apex.malhar.lib.window.Tuple; import org.apache.apex.malhar.lib.window.Window; import org.apache.apex.malhar.lib.window.WindowOption; import org.apache.apex.malhar.lib.window.WindowState; -import org.apache.apex.malhar.lib.window.WindowedKeyedStorage; +import org.apache.apex.malhar.lib.window.WindowedStorage; import org.apache.hadoop.classification.InterfaceStability; import com.datatorrent.lib.util.KeyValPair; @@ -48,11 +48,11 @@ import com.datatorrent.lib.util.KeyValPair; */ @InterfaceStability.Evolving public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT> - extends AbstractWindowedOperator<KeyValPair<KeyT, InputValT>, KeyValPair<KeyT, OutputValT>, WindowedKeyedStorage<KeyT, AccumT>, WindowedKeyedStorage<KeyT, OutputValT>, Accumulation<InputValT, AccumT, OutputValT>> + extends AbstractWindowedOperator<KeyValPair<KeyT, InputValT>, KeyValPair<KeyT, OutputValT>, WindowedStorage.WindowedKeyedStorage<KeyT, AccumT>, WindowedStorage.WindowedKeyedStorage<KeyT, OutputValT>, Accumulation<InputValT, AccumT, OutputValT>> { @Override - protected void assignSessionWindows(List<Window> windows, long timestamp, Tuple<KeyValPair<KeyT, InputValT>> inputTuple) + protected Collection<Window.SessionWindow> assignSessionWindows(long timestamp, Tuple<KeyValPair<KeyT, InputValT>> inputTuple) { KeyT key = inputTuple.getValue().getKey(); WindowOption.SessionWindows sessionWindowOption = (WindowOption.SessionWindows)windowOption; @@ -126,7 +126,7 @@ public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT> default: throw new IllegalStateException("There are more than two sessions matching one timestamp"); } - windows.add(sessionWindowToAssign); + return Collections.<Window.SessionWindow>singleton(sessionWindowToAssign); } @Override @@ -147,7 +147,7 @@ public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT> @Override public void fireNormalTrigger(Window window, boolean fireOnlyUpdatedPanes) { - for (Map.Entry<KeyT, AccumT> entry : dataStorage.entrySet(window)) { + for (Map.Entry<KeyT, AccumT> entry : dataStorage.entries(window)) { OutputValT outputVal = accumulation.getOutput(entry.getValue()); if (fireOnlyUpdatedPanes) { OutputValT oldValue = retractionStorage.get(window, entry.getKey()); @@ -168,7 +168,7 @@ public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT> if (triggerOption.getAccumulationMode() != TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) { throw new UnsupportedOperationException(); } - for (Map.Entry<KeyT, OutputValT> entry : retractionStorage.entrySet(window)) { + for (Map.Entry<KeyT, OutputValT> entry : retractionStorage.entries(window)) { output.emit(new Tuple.WindowedTuple<>(window, new KeyValPair<>(entry.getKey(), accumulation.getRetraction(entry.getValue())))); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java index d195004..7275d88 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java @@ -36,7 +36,7 @@ import org.apache.hadoop.classification.InterfaceStability; */ @InterfaceStability.Evolving public class WindowedOperatorImpl<InputT, AccumT, OutputT> - extends AbstractWindowedOperator<InputT, OutputT, WindowedStorage<AccumT>, WindowedStorage<OutputT>, Accumulation<InputT, AccumT, OutputT>> + extends AbstractWindowedOperator<InputT, OutputT, WindowedStorage.WindowedPlainStorage<AccumT>, WindowedStorage.WindowedPlainStorage<OutputT>, Accumulation<InputT, AccumT, OutputT>> { @Override public void accumulateTuple(Tuple.WindowedTuple<InputT> tuple) http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java index f8f9d8a..7396994 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java @@ -18,9 +18,8 @@ */ package org.apache.apex.malhar.lib.window; -import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; -import java.util.List; import java.util.Map; import javax.validation.ValidationException; @@ -29,6 +28,7 @@ import org.joda.time.Duration; import org.junit.Assert; import org.junit.Test; +import org.apache.apex.malhar.lib.window.impl.InMemorySessionWindowedStorage; import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedKeyedStorage; import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedStorage; import org.apache.apex.malhar.lib.window.impl.KeyedWindowedOperatorImpl; @@ -112,8 +112,8 @@ public class WindowedOperatorTest windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000))); windowedOperator.setAllowedLateness(Duration.millis(1000)); - WindowedStorage<MutableLong> dataStorage = new InMemoryWindowedStorage<>(); - WindowedStorage<WindowState> windowStateStorage = new InMemoryWindowedStorage<>(); + WindowedStorage.WindowedPlainStorage<MutableLong> dataStorage = new InMemoryWindowedStorage<>(); + WindowedStorage.WindowedPlainStorage<WindowState> windowStateStorage = new InMemoryWindowedStorage<>(); windowedOperator.setDataStorage(dataStorage); windowedOperator.setWindowStateStorage(windowStateStorage); @@ -124,7 +124,7 @@ public class WindowedOperatorTest Assert.assertEquals("There should be exactly one window in the storage", 1, dataStorage.size()); Assert.assertEquals("There should be exactly one window in the storage", 1, windowStateStorage.size()); - Map.Entry<Window, WindowState> entry = windowStateStorage.entrySet().iterator().next(); + Map.Entry<Window, WindowState> entry = windowStateStorage.entries().iterator().next(); Window window = entry.getKey(); WindowState windowState = entry.getValue(); Assert.assertEquals(-1, windowState.watermarkArrivalTime); @@ -135,7 +135,7 @@ public class WindowedOperatorTest windowedOperator.processWatermark(new WatermarkImpl(1200)); windowedOperator.endWindow(); - Assert.assertTrue(windowState.watermarkArrivalTime > 0); + Assert.assertTrue(windowState.watermarkArrivalTime >= 0); Assert.assertEquals("We should get one watermark tuple", 1, controlSink.getCount(false)); windowedOperator.beginWindow(2); @@ -285,9 +285,9 @@ public class WindowedOperatorTest windowedOperator.setWindowOption(new WindowOption.GlobalWindow()); windowedOperator.setup(context); Tuple.WindowedTuple<Long> windowedValue = windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(1100L, 2L)); - List<Window> windows = windowedValue.getWindows(); + Collection<? extends Window> windows = windowedValue.getWindows(); Assert.assertEquals(1, windows.size()); - Assert.assertEquals(Window.GLOBAL_WINDOW, windows.get(0)); + Assert.assertEquals(Window.GlobalWindow.INSTANCE, windows.iterator().next()); } @Test @@ -299,10 +299,11 @@ public class WindowedOperatorTest windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000))); windowedOperator.setup(context); Tuple.WindowedTuple<Long> windowedValue = windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(1100L, 2L)); - List<Window> windows = windowedValue.getWindows(); + Collection<? extends Window> windows = windowedValue.getWindows(); Assert.assertEquals(1, windows.size()); - Assert.assertEquals(1000, windows.get(0).getBeginTimestamp()); - Assert.assertEquals(1000, windows.get(0).getDurationMillis()); + Window window = windows.iterator().next(); + Assert.assertEquals(1000, window.getBeginTimestamp()); + Assert.assertEquals(1000, window.getDurationMillis()); } @Test @@ -314,9 +315,8 @@ public class WindowedOperatorTest windowedOperator.setWindowOption(new WindowOption.SlidingTimeWindows(Duration.millis(1000), Duration.millis(200))); windowedOperator.setup(context); Tuple.WindowedTuple<Long> windowedValue = windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(1600L, 2L)); - List<Window> windows = windowedValue.getWindows(); + Collection<? extends Window> windows = windowedValue.getWindows(); Window[] winArray = windows.toArray(new Window[]{}); - Arrays.sort(winArray, Window.DEFAULT_COMPARATOR); Assert.assertEquals(5, winArray.length); Assert.assertEquals(800, winArray[0].getBeginTimestamp()); Assert.assertEquals(1000, winArray[0].getDurationMillis()); @@ -336,6 +336,8 @@ public class WindowedOperatorTest OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1, new Attribute.AttributeMap.DefaultAttributeMap()); KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> windowedOperator = createDefaultKeyedWindowedOperator(); + windowedOperator.setDataStorage(new InMemorySessionWindowedStorage<String, MutableLong>()); + windowedOperator.setRetractionStorage(new InMemorySessionWindowedStorage<String, Long>()); windowedOperator.setWindowOption(new WindowOption.SessionWindows(Duration.millis(2000))); windowedOperator.setTriggerOption(new TriggerOption().withEarlyFiringsAtEvery(1).accumulatingAndRetractingFiredPanes().firingOnlyUpdatedPanes()); CollectorTestSink<Tuple<KeyValPair<String, Long>>> sink = new CollectorTestSink(); @@ -348,7 +350,7 @@ public class WindowedOperatorTest Assert.assertEquals(1, sink.getCount(false)); Tuple.WindowedTuple<KeyValPair<String, Long>> out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(0); Assert.assertEquals(1, out.getWindows().size()); - Window.SessionWindow<String> window1 = (Window.SessionWindow<String>)out.getWindows().get(0); + Window.SessionWindow<String> window1 = (Window.SessionWindow<String>)out.getWindows().iterator().next(); Assert.assertEquals(1100L, window1.getBeginTimestamp()); Assert.assertEquals(1, window1.getDurationMillis()); Assert.assertEquals("a", window1.getKey()); @@ -364,13 +366,13 @@ public class WindowedOperatorTest // retraction trigger out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(0); Assert.assertEquals(1, out.getWindows().size()); - Assert.assertEquals(window1, out.getWindows().get(0)); + Assert.assertEquals(window1, out.getWindows().iterator().next()); Assert.assertEquals("a", out.getValue().getKey()); Assert.assertEquals(-2L, out.getValue().getValue().longValue()); // normal trigger out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(1); - Window.SessionWindow<String> window2 = (Window.SessionWindow<String>)out.getWindows().get(0); + Window.SessionWindow<String> window2 = (Window.SessionWindow<String>)out.getWindows().iterator().next(); Assert.assertEquals(1100L, window2.getBeginTimestamp()); Assert.assertEquals(901, window2.getDurationMillis()); @@ -384,7 +386,7 @@ public class WindowedOperatorTest Assert.assertEquals(1, sink.getCount(false)); out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(0); Assert.assertEquals(1, out.getWindows().size()); - Window.SessionWindow<String> window3 = (Window.SessionWindow<String>)out.getWindows().get(0); + Window.SessionWindow<String> window3 = (Window.SessionWindow<String>)out.getWindows().iterator().next(); Assert.assertEquals(5000L, window3.getBeginTimestamp()); Assert.assertEquals(1, window3.getDurationMillis()); Assert.assertEquals("a", out.getValue().getKey()); @@ -400,19 +402,19 @@ public class WindowedOperatorTest // retraction of the two old windows out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(0); Assert.assertEquals(1, out.getWindows().size()); - Assert.assertEquals(window2, out.getWindows().get(0)); + Assert.assertEquals(window2, out.getWindows().iterator().next()); Assert.assertEquals("a", out.getValue().getKey()); Assert.assertEquals(-5L, out.getValue().getValue().longValue()); out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(1); Assert.assertEquals(1, out.getWindows().size()); - Assert.assertEquals(window3, out.getWindows().get(0)); + Assert.assertEquals(window3, out.getWindows().iterator().next()); Assert.assertEquals("a", out.getValue().getKey()); Assert.assertEquals(-4L, out.getValue().getValue().longValue()); // normal trigger out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(2); Assert.assertEquals(1, out.getWindows().size()); - Window.SessionWindow<String> window4 = (Window.SessionWindow<String>)out.getWindows().get(0); + Window.SessionWindow<String> window4 = (Window.SessionWindow<String>)out.getWindows().iterator().next(); Assert.assertEquals(1100L, window4.getBeginTimestamp()); Assert.assertEquals(3901, window4.getDurationMillis()); Assert.assertEquals("a", out.getValue().getKey()); @@ -428,7 +430,7 @@ public class WindowedOperatorTest new Attribute.AttributeMap.DefaultAttributeMap()); KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> windowedOperator = createDefaultKeyedWindowedOperator(); windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000))); - WindowedKeyedStorage<String, MutableLong> dataStorage = new InMemoryWindowedKeyedStorage<>(); + WindowedStorage.WindowedKeyedStorage<String, MutableLong> dataStorage = new InMemoryWindowedKeyedStorage<>(); windowedOperator.setDataStorage(dataStorage); windowedOperator.setup(context); windowedOperator.beginWindow(1); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/stream/src/main/java/org/apache/apex/malhar/stream/api/util/TupleUtil.java ---------------------------------------------------------------------- diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/util/TupleUtil.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/util/TupleUtil.java index 04f42b3..f9a4ed8 100644 --- a/stream/src/main/java/org/apache/apex/malhar/stream/api/util/TupleUtil.java +++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/util/TupleUtil.java @@ -18,10 +18,7 @@ */ package org.apache.apex.malhar.stream.api.util; -import java.util.List; - import org.apache.apex.malhar.lib.window.Tuple; -import org.apache.apex.malhar.lib.window.Window; /** * The tuple util will be used to extract fields that are used as key or value<br> @@ -38,14 +35,8 @@ public class TupleUtil { if (t instanceof Tuple.WindowedTuple) { - Tuple.WindowedTuple<O> newT = new Tuple.WindowedTuple<>(); - List<Window> wins = ((Tuple.WindowedTuple)t).getWindows(); - for (Window w : wins) { - newT.addWindow(w); - } - newT.setValue(newValue); - ((Tuple.WindowedTuple)t).setTimestamp(((Tuple.WindowedTuple)t).getTimestamp()); - return newT; + Tuple.WindowedTuple windowedTuple = (Tuple.WindowedTuple)t; + return new Tuple.WindowedTuple<>(windowedTuple.getWindows(), windowedTuple.getTimestamp(), newValue); } else if (t instanceof Tuple.TimestampedTuple) { return new Tuple.TimestampedTuple<>(((Tuple.TimestampedTuple)t).getTimestamp(), newValue); } else {
