Repository: flink
Updated Branches:
  refs/heads/master 62192c783 -> 74bb7bb63


[FLINK-4174] Enhance evictor functionality


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/74bb7bb6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/74bb7bb6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/74bb7bb6

Branch: refs/heads/master
Commit: 74bb7bb63919ce6de5736d52e4e5a254cf9b6509
Parents: 62192c7
Author: Vishnu Viswanath <[email protected]>
Authored: Mon Oct 31 18:21:04 2016 -0500
Committer: Aljoscha Krettek <[email protected]>
Committed: Tue Nov 15 10:05:24 2016 +0100

----------------------------------------------------------------------
 .../api/windowing/evictors/CountEvictor.java    |  54 ++-
 .../api/windowing/evictors/DeltaEvictor.java    |  56 ++-
 .../api/windowing/evictors/Evictor.java         |  52 ++-
 .../api/windowing/evictors/TimeEvictor.java     |  84 +++-
 .../windowing/EvictingWindowOperator.java       | 112 ++++-
 .../operators/windowing/TimestampedValue.java   | 112 +++++
 .../operators/windowing/WindowOperator.java     |   4 +-
 .../windowing/EvictingWindowOperatorTest.java   | 462 +++++++++++++++++++
 8 files changed, 886 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/74bb7bb6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java
index dc82521..8f5b2d9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java
@@ -19,7 +19,9 @@ package org.apache.flink.streaming.api.windowing.evictors;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
+
+import java.util.Iterator;
 
 /**
  * An {@link Evictor} that keeps up to a certain amount of elements.
@@ -31,26 +33,68 @@ public class CountEvictor<W extends Window> implements 
Evictor<Object, W> {
        private static final long serialVersionUID = 1L;
 
        private final long maxCount;
+       private final boolean doEvictAfter;
+
+       private CountEvictor(long count,boolean doEvictAfter) {
+               this.maxCount = count;
+               this.doEvictAfter = doEvictAfter;
+       }
 
        private CountEvictor(long count) {
                this.maxCount = count;
+               this.doEvictAfter = false;
        }
 
        @Override
-       public int evict(Iterable<StreamRecord<Object>> elements, int size, W 
window) {
-               if (size > maxCount) {
-                       return (int) (size - maxCount);
+       public void evictBefore(Iterable<TimestampedValue<Object>> elements, 
int size, W window, EvictorContext ctx) {
+               if (!doEvictAfter) {
+                       evict(elements, size, ctx);
+               }
+       }
+
+
+       @Override
+       public void evictAfter(Iterable<TimestampedValue<Object>> elements, int 
size,W window, EvictorContext ctx) {
+               if (doEvictAfter) {
+                       evict(elements, size, ctx);
+               }
+       }
+
+       private void evict(Iterable<TimestampedValue<Object>> elements, int 
size, EvictorContext ctx) {
+               if (size <= maxCount) {
+                       return;
                } else {
-                       return 0;
+                       int evictedCount = 0;
+                       for (Iterator<TimestampedValue<Object>> iterator = 
elements.iterator(); iterator.hasNext();){
+                               iterator.next();
+                               evictedCount++;
+                               if (evictedCount > size - maxCount) {
+                                       break;
+                               } else {
+                                       iterator.remove();
+                               }
+                       }
                }
        }
 
        /**
         * Creates a {@code CountEvictor} that keeps the given number of 
elements.
+        * Eviction is done before the window function.
         *
         * @param maxCount The number of elements to keep in the pane.
         */
        public static <W extends Window> CountEvictor<W> of(long maxCount) {
                return new CountEvictor<>(maxCount);
        }
+
+       /**
+        * Creates a {@code CountEvictor} that keeps the given number of 
elements in the pane
+        * Eviction is done before/after the window function based on the value 
of doEvictAfter.
+        *
+        * @param maxCount The number of elements to keep in the pane.
+        * @param doEvictAfter Whether to do eviction after the window function.
+     */
+       public static <W extends Window> CountEvictor<W> of(long maxCount, 
boolean doEvictAfter) {
+               return new CountEvictor<>(maxCount,doEvictAfter);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/74bb7bb6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java
index ef4dad6..7ae33b8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java
@@ -21,15 +21,16 @@ import com.google.common.collect.Iterables;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
 import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
+
+import java.util.Iterator;
 
 /**
  * An {@link Evictor} that keeps elements based on a {@link DeltaFunction} and 
a threshold.
  *
  * <p>
  * Eviction starts from the first element of the buffer and removes all 
elements from the buffer
- * which have a higher delta then the threshold. As soon as there is an 
element with a lower delta,
- * the eviction stops.
+ * which have a higher delta then the threshold.
  *
  * @param <W> The type of {@link Window Windows} on which this {@code Evictor} 
can operate.
  */
@@ -39,24 +40,42 @@ public class DeltaEvictor<T, W extends Window> implements 
Evictor<T, W> {
 
        DeltaFunction<T> deltaFunction;
        private double threshold;
+       private final boolean doEvictAfter;
 
        private DeltaEvictor(double threshold, DeltaFunction<T> deltaFunction) {
                this.deltaFunction = deltaFunction;
                this.threshold = threshold;
+               this.doEvictAfter = false;
+       }
+
+       private DeltaEvictor(double threshold, DeltaFunction<T> deltaFunction, 
boolean doEvictAfter) {
+               this.deltaFunction = deltaFunction;
+               this.threshold = threshold;
+               this.doEvictAfter = doEvictAfter;
        }
 
        @Override
-       public int evict(Iterable<StreamRecord<T>> elements, int size, W 
window) {
-               StreamRecord<T> lastElement = Iterables.getLast(elements);
-               int toEvict = 0;
-               for (StreamRecord<T> element : elements) {
-                       if (deltaFunction.getDelta(element.getValue(), 
lastElement.getValue()) < this.threshold) {
-                               break;
-                       }
-                       toEvict++;
+       public void evictBefore(Iterable<TimestampedValue<T>> elements, int 
size, W window, EvictorContext ctx) {
+               if (!doEvictAfter) {
+                       evict(elements, size, ctx);
                }
+       }
 
-               return toEvict;
+       @Override
+       public void evictAfter(Iterable<TimestampedValue<T>> elements, int 
size, W window, EvictorContext ctx) {
+               if (doEvictAfter) {
+                       evict(elements, size, ctx);
+               }
+       }
+
+       private void evict(Iterable<TimestampedValue<T>> elements, int size, 
EvictorContext ctx) {
+               TimestampedValue<T> lastElement = Iterables.getLast(elements);
+               for (Iterator<TimestampedValue<T>> iterator = 
elements.iterator(); iterator.hasNext();){
+                       TimestampedValue<T> element = iterator.next();
+                       if (deltaFunction.getDelta(element.getValue(), 
lastElement.getValue()) >= this.threshold) {
+                               iterator.remove();
+                       }
+               }
        }
 
        @Override
@@ -66,6 +85,7 @@ public class DeltaEvictor<T, W extends Window> implements 
Evictor<T, W> {
 
        /**
         * Creates a {@code DeltaEvictor} from the given threshold and {@code 
DeltaFunction}.
+        * Eviction is done before the window function.
         *
         * @param threshold The threshold
         * @param deltaFunction The {@code DeltaFunction}
@@ -73,4 +93,16 @@ public class DeltaEvictor<T, W extends Window> implements 
Evictor<T, W> {
        public static <T, W extends Window> DeltaEvictor<T, W> of(double 
threshold, DeltaFunction<T> deltaFunction) {
                return new DeltaEvictor<>(threshold, deltaFunction);
        }
+
+       /**
+        * Creates a {@code DeltaEvictor} from the given threshold, {@code 
DeltaFunction}.
+        * Eviction is done before/after the window function based on the value 
of doEvictAfter.
+        *
+        * @param threshold The threshold
+        * @param deltaFunction The {@code DeltaFunction}
+        * @param doEvictAfter Whether eviction should be done after window 
function
+     */
+       public static <T, W extends Window> DeltaEvictor<T, W> of(double 
threshold, DeltaFunction<T> deltaFunction, boolean doEvictAfter) {
+               return new DeltaEvictor<>(threshold, deltaFunction, 
doEvictAfter);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/74bb7bb6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
index d8e0daa..02e93eb 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
@@ -18,14 +18,17 @@
 package org.apache.flink.streaming.api.windowing.evictors;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+
 import java.io.Serializable;
 
 /**
- * An {@code Evictor} can remove elements from a pane before it is being 
processed and after
- * window evaluation was triggered by a
- * {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}.
+ * An {@code Evictor} can remove elements from a pane before/after the 
evaluation of WindowFunction and
+ * after the window evaluation gets triggered by a {@link 
org.apache.flink.streaming.api.windowing.triggers.Trigger}
  *
  * <p>
  * A pane is the bucket of elements that have the same key (assigned by the
@@ -41,13 +44,48 @@ import java.io.Serializable;
 public interface Evictor<T, W extends Window> extends Serializable {
 
        /**
-        * Computes how many elements should be removed from the pane. The 
result specifies how
-        * many elements should be removed from the beginning.
+        * Optionally evicts elements. Called before windowing function.
+        *
+        * @param elements The elements currently in the pane.
+        * @param size The current number of elements in the pane.
+        * @param window The {@link Window}
+        * @param evictorContext The context for the Evictor
+     */
+       void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W 
window, EvictorContext evictorContext);
+
+       /**
+        * Optionally evicts elements. Called after windowing function.
         *
         * @param elements The elements currently in the pane.
         * @param size The current number of elements in the pane.
         * @param window The {@link Window}
+        * @param evictorContext The context for the Evictor
         */
-       int evict(Iterable<StreamRecord<T>> elements, int size, W window);
+       void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W 
window, EvictorContext evictorContext);
+
+
+       /**
+        * A context object that is given to {@link Evictor} methods
+        */
+       interface EvictorContext {
+
+               /**
+                * Returns the current processing time, as returned by
+                * the {@link ProcessingTimeService#getCurrentProcessingTime}.
+                */
+               long getCurrentProcessingTime();
+
+               /**
+                * Returns the metric group for this {@link Evictor}. This is 
the same metric
+                * group that would be returned from {@link 
RuntimeContext#getMetricGroup()} in a user
+                * function.
+                *
+                * <p>You must not call methods that create metric objects
+                * (such as {@link MetricGroup#counter(int)} multiple times but 
instead call once
+                * and store the metric object in a field.
+                */
+               MetricGroup getMetricGroup();
+
+       }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/74bb7bb6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
index 369a7ae..33d1cb5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
@@ -17,16 +17,18 @@
  */
 package org.apache.flink.streaming.api.windowing.evictors;
 
-import com.google.common.collect.Iterables;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
+
+import java.util.Iterator;
 
 /**
  * An {@link Evictor} that keeps elements for a certain amount of time. 
Elements older
- * than {@code current_time - keep_time} are evicted.
+ * than {@code current_time - keep_time} are evicted. The current_time is time 
associated
+ * with {@link TimestampedValue}
  *
  * @param <W> The type of {@link Window Windows} on which this {@code Evictor} 
can operate.
  */
@@ -35,23 +37,71 @@ public class TimeEvictor<W extends Window> implements 
Evictor<Object, W> {
        private static final long serialVersionUID = 1L;
 
        private final long windowSize;
+       private final boolean doEvictAfter;
 
        public TimeEvictor(long windowSize) {
                this.windowSize = windowSize;
+               this.doEvictAfter = false;
+       }
+
+       public TimeEvictor(long windowSize, boolean doEvictAfter) {
+               this.windowSize = windowSize;
+               this.doEvictAfter = doEvictAfter;
        }
 
+
        @Override
-       public int evict(Iterable<StreamRecord<Object>> elements, int size, W 
window) {
-               int toEvict = 0;
-               long currentTime = Iterables.getLast(elements).getTimestamp();
+       public void evictBefore(Iterable<TimestampedValue<Object>> elements, 
int size, W window, EvictorContext ctx) {
+               if(!doEvictAfter) {
+                       evict(elements,size,ctx);
+               }
+       }
+
+       @Override
+       public void evictAfter(Iterable<TimestampedValue<Object>> elements, int 
size, W window, EvictorContext ctx) {
+               if(doEvictAfter) {
+                       evict(elements,size,ctx);
+               }
+       }
+
+       private void evict(Iterable<TimestampedValue<Object>> elements, int 
size, EvictorContext ctx) {
+               if (!hasTimestamp(elements)) {
+                       return;
+               }
+
+               long currentTime = getMaxTimestamp(elements);
                long evictCutoff = currentTime - windowSize;
-               for (StreamRecord<Object> record: elements) {
-                       if (record.getTimestamp() > evictCutoff) {
-                               break;
+
+               for (Iterator<TimestampedValue<Object>> iterator = 
elements.iterator(); iterator.hasNext(); ) {
+                       TimestampedValue<Object> record = iterator.next();
+                       if (record.getTimestamp() <= evictCutoff) {
+                               iterator.remove();
                        }
-                       toEvict++;
                }
-               return toEvict;
+       }
+
+       /**
+        * Returns true if the first element in the Iterable of {@link 
TimestampedValue} has a timestamp.
+     */
+       private boolean hasTimestamp(Iterable<TimestampedValue<Object>> 
elements) {
+               Iterator<TimestampedValue<Object>> it = elements.iterator();
+               if (it.hasNext()) {
+                       return it.next().hasTimestamp();
+               }
+               return false;
+       }
+
+       /**
+        * @param elements The elements currently in the pane.
+        * @return The maximum value of timestamp among the elements.
+     */
+       private long getMaxTimestamp(Iterable<TimestampedValue<Object>> 
elements) {
+               long currentTime = Long.MIN_VALUE;
+               for (Iterator<TimestampedValue<Object>> iterator = 
elements.iterator(); iterator.hasNext();){
+                       TimestampedValue<Object> record = iterator.next();
+                       currentTime = Math.max(currentTime, 
record.getTimestamp());
+               }
+               return currentTime;
        }
 
        @Override
@@ -66,10 +116,22 @@ public class TimeEvictor<W extends Window> implements 
Evictor<Object, W> {
 
        /**
         * Creates a {@code TimeEvictor} that keeps the given number of 
elements.
+        * Eviction is done before the window function.
         *
         * @param windowSize The amount of time for which to keep elements.
         */
        public static <W extends Window> TimeEvictor<W> of(Time windowSize) {
                return new TimeEvictor<>(windowSize.toMilliseconds());
        }
+
+       /**
+        * Creates a {@code TimeEvictor} that keeps the given number of 
elements.
+        * Eviction is done before/after the window function based on the value 
of doEvictAfter.
+        *
+        * @param windowSize The amount of time for which to keep elements.
+        * @param doEvictAfter Whether eviction is done after window function.
+     */
+       public static <W extends Window> TimeEvictor<W> of(Time windowSize, 
boolean doEvictAfter) {
+               return new 
TimeEvictor<>(windowSize.toMilliseconds(),doEvictAfter);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/74bb7bb6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index f9b409e..3be3f5a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.operators.InternalTimer;
 import 
org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
@@ -47,8 +48,8 @@ import static java.util.Objects.requireNonNull;
  * A {@link WindowOperator} that also allows an {@link Evictor} to be used.
  *
  * <p>
- * The {@code Evictor} is used to evict elements from panes before processing 
a window and after
- * a {@link Trigger} has fired.
+ * The {@code Evictor} is used to remove elements from a pane before/after the 
evaluation of WindowFunction and
+ * after the window evaluation gets triggered by a {@link 
org.apache.flink.streaming.api.windowing.triggers.Trigger}.
  *
  * @param <K> The type of key returned by the {@code KeySelector}.
  * @param <IN> The type of the incoming elements.
@@ -62,6 +63,8 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window> extends Window
 
        private final Evictor<? super IN, ? super W> evictor;
 
+       protected transient EvictorContext evictorContext = new 
EvictorContext(null, null);
+
        private final StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> 
windowStateDescriptor;
 
        public EvictingWindowOperator(WindowAssigner<? super IN, W> 
windowAssigner,
@@ -146,6 +149,8 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window> extends Window
 
                                context.key = key;
                                context.window = actualWindow;
+                               evictorContext.key = key;
+                               evictorContext.window = actualWindow;
 
                                // we might have already fired because of a 
merge but still call onElement
                                // on the (possibly merged) window
@@ -158,7 +163,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window> extends Window
                                                // if we have no state, there 
is nothing to do
                                                continue;
                                        }
-                                       fire(actualWindow, contents);
+                                       fire(actualWindow, contents, 
windowState);
                                }
 
                                if (combinedTriggerResult.isPurge()) {
@@ -183,6 +188,8 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window> extends Window
 
                                context.key = key;
                                context.window = window;
+                               evictorContext.key = key;
+                               evictorContext.window = window;
 
                                TriggerResult triggerResult = 
context.onElement(element);
 
@@ -192,7 +199,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window> extends Window
                                                // if we have no state, there 
is nothing to do
                                                continue;
                                        }
-                                       fire(window, contents);
+                                       fire(window, contents, windowState);
                                }
 
                                if (triggerResult.isPurge()) {
@@ -209,6 +216,8 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window> extends Window
 
                context.key = timer.getKey();
                context.window = timer.getNamespace();
+               evictorContext.key = timer.getKey();
+               evictorContext.window = timer.getNamespace();
 
                ListState<StreamRecord<IN>> windowState;
                MergingWindowSet<W> mergingWindows = null;
@@ -238,7 +247,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window> extends Window
 
                TriggerResult triggerResult = 
context.onEventTime(timer.getTimestamp());
                if (triggerResult.isFire()) {
-                       fire(context.window, contents);
+                       fire(context.window, contents, windowState);
                }
 
                if (triggerResult.isPurge() || (windowAssigner.isEventTime() && 
isCleanupTime(context.window, timer.getTimestamp()))) {
@@ -250,6 +259,8 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window> extends Window
        public void onProcessingTime(InternalTimer<K, W> timer) throws 
Exception {
                context.key = timer.getKey();
                context.window = timer.getNamespace();
+               evictorContext.key = timer.getKey();
+               evictorContext.window = timer.getNamespace();
 
                ListState<StreamRecord<IN>> windowState;
                MergingWindowSet<W> mergingWindows = null;
@@ -276,7 +287,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window> extends Window
 
                TriggerResult triggerResult = 
context.onProcessingTime(timer.getTimestamp());
                if (triggerResult.isFire()) {
-                       fire(context.window, contents);
+                       fire(context.window, contents, windowState);
                }
 
                if (triggerResult.isPurge() || (!windowAssigner.isEventTime() 
&& isCleanupTime(context.window, timer.getTimestamp()))) {
@@ -284,22 +295,79 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window> extends Window
                }
        }
 
-       private void fire(W window, Iterable<StreamRecord<IN>> contents) throws 
Exception {
+       private void fire(W window, Iterable<StreamRecord<IN>> contents, 
ListState<StreamRecord<IN>> windowState) throws Exception {
                
timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
 
                // Work around type system restrictions...
-               int toEvict = evictor.evict((Iterable) contents, 
Iterables.size(contents), context.window);
-
-               FluentIterable<IN> projectedContents = FluentIterable
+               FluentIterable<TimestampedValue<IN>> recordsWithTimestamp = 
FluentIterable
                        .from(contents)
-                       .skip(toEvict)
-                       .transform(new Function<StreamRecord<IN>, IN>() {
+                       .transform(new Function<StreamRecord<IN>, 
TimestampedValue<IN>>() {
+                               @Override
+                               public TimestampedValue<IN> 
apply(StreamRecord<IN> input) {
+                                       return TimestampedValue.from(input);
+                               }
+                       });
+               evictorContext.evictBefore(recordsWithTimestamp, 
Iterables.size(recordsWithTimestamp));
+
+               FluentIterable<IN> projectedContents = recordsWithTimestamp
+                       .transform(new Function<TimestampedValue<IN>, IN>() {
                                @Override
-                               public IN apply(StreamRecord<IN> input) {
+                               public IN apply(TimestampedValue<IN> input) {
                                        return input.getValue();
                                }
                        });
+
                userFunction.apply(context.key, context.window, 
projectedContents, timestampedCollector);
+               evictorContext.evictAfter(recordsWithTimestamp, 
Iterables.size(recordsWithTimestamp));
+
+
+               //work around to fix FLINK-4369, remove the evicted elements 
from the windowState.
+               //this is inefficient, but there is no other way to remove 
elements from ListState, which is an AppendingState.
+               windowState.clear();
+               for(TimestampedValue<IN> record : recordsWithTimestamp) {
+                       windowState.add(record.getStreamRecord());
+               }
+       }
+
+
+       /**
+        * {@code EvictorContext} is a utility for handling {@code Evictor} 
invocations. It can be reused
+        * by setting the {@code key} and {@code window} fields. No internal 
state must be kept in
+        * the {@code EvictorContext}.
+        */
+
+       class EvictorContext implements Evictor.EvictorContext {
+
+               protected K key;
+               protected W window;
+
+               public EvictorContext(K key, W window) {
+                       this.key = key;
+                       this.window = window;
+               }
+
+               @Override
+               public long getCurrentProcessingTime() {
+                       return 
EvictingWindowOperator.this.getProcessingTimeService().getCurrentProcessingTime();
+               }
+
+               @Override
+               public MetricGroup getMetricGroup() {
+                       return EvictingWindowOperator.this.getMetricGroup();
+               }
+
+
+               public K getKey() {
+                       return key;
+               }
+
+               void evictBefore(Iterable<TimestampedValue<IN>> elements, int 
size) {
+                       evictor.evictBefore((Iterable)elements, size, window, 
this);
+               }
+
+               void evictAfter(Iterable<TimestampedValue<IN>>  elements, int 
size) {
+                       evictor.evictAfter((Iterable)elements, size, window, 
this);
+               }
        }
 
        private void cleanup(W window,
@@ -314,6 +382,24 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window> extends Window
                context.clear();
        }
 
+       @Override
+       public void open() throws Exception {
+               super.open();
+               evictorContext = new EvictorContext(null,null);
+       }
+
+       @Override
+       public void close() throws Exception {
+               super.close();
+               evictorContext = null;
+       }
+
+       @Override
+       public void dispose() throws Exception{
+               super.dispose();
+               evictorContext = null;
+       }
+
        // 
------------------------------------------------------------------------
        // Getters for testing
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/74bb7bb6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/TimestampedValue.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/TimestampedValue.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/TimestampedValue.java
new file mode 100644
index 0000000..9c63a69
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/TimestampedValue.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+/**
+ * Stores the value and the timestamp of the record.
+ *
+ * @param <T> The type encapsulated value
+ */
+@PublicEvolving
+public class TimestampedValue<T> {
+
+       /** The actual value held by this record */
+       private T value;
+
+       /** The timestamp of the record */
+       private long timestamp;
+
+       /** Flag whether the timestamp is actually set */
+       private boolean hasTimestamp;
+
+       /**
+        * Creates a new TimestampedValue. The record does not have a timestamp.
+        */
+       public TimestampedValue(T value) {
+               this.value = value;
+       }
+
+       /**
+        * Creates a new TimestampedValue wrapping the given value. The 
timestamp is set to the
+        * given timestamp.
+        *
+        * @param value The value to wrap in this {@link TimestampedValue}
+        * @param timestamp The timestamp in milliseconds
+        */
+       public TimestampedValue(T value, long timestamp) {
+               this.value = value;
+               this.timestamp = timestamp;
+               this.hasTimestamp = true;
+       }
+
+       /**
+        * @return The value wrapped in this {@link TimestampedValue}.
+        */
+       public T getValue() {
+               return value;
+       }
+
+       /**
+        * @return The timestamp associated with this stream value in 
milliseconds.
+     */
+       public long getTimestamp() {
+               if (hasTimestamp) {
+                       return timestamp;
+               } else {
+                       throw new IllegalStateException(
+                                       "Record has no timestamp. Is the time 
characteristic set to 'ProcessingTime', or " +
+                                                       "did you forget to call 
'DataStream.assignTimestampsAndWatermarks(...)'?");
+               }
+       }
+
+       /**
+        * Checks whether this record has a timestamp.
+        *
+        * @return True if the record has a timestamp, false if not.
+        */
+       public boolean hasTimestamp() {
+               return hasTimestamp;
+       }
+
+       /**
+        * Creates a {@link StreamRecord} from this TimestampedValue.
+     */
+       public StreamRecord<T> getStreamRecord() {
+               StreamRecord<T> streamRecord = new StreamRecord<>(value);
+               if (hasTimestamp) {
+                       streamRecord.setTimestamp(timestamp);
+               }
+               return streamRecord;
+       }
+
+       /**
+        * Creates a TimestampedValue from given {@link StreamRecord}.
+        *
+        * @param streamRecord The StreamRecord object from which 
TimestampedValue is to be created.
+     */
+       public static <T> TimestampedValue<T> from(StreamRecord<T> 
streamRecord) {
+               if (streamRecord.hasTimestamp()) {
+                       return new TimestampedValue<>(streamRecord.getValue(), 
streamRecord.getTimestamp());
+               } else {
+                       return new TimestampedValue<>(streamRecord.getValue());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/74bb7bb6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 229d97d..6ff3999 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -181,7 +181,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
        }
 
        @Override
-       public final void open() throws Exception {
+       public void open() throws Exception {
                super.open();
 
                timestampedCollector = new TimestampedCollector<>(output);
@@ -200,7 +200,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
        }
 
        @Override
-       public final void close() throws Exception {
+       public void close() throws Exception {
                super.close();
                timestampedCollector = null;
                context = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/74bb7bb6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
index 2e3d090..46495b0 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
@@ -29,10 +29,13 @@ import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.streaming.api.functions.windowing.ReduceIterableWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
 import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
+import org.apache.flink.streaming.api.windowing.evictors.DeltaEvictor;
+import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
@@ -58,6 +61,464 @@ public class EvictingWindowOperatorTest {
 
        // For counting if close() is called the correct number of times on the 
SumReducer
 
+       /**
+        * Tests CountEvictor evictAfter behavior
+        * @throws Exception
+     */
+       @Test
+       public void testCountEvictorEvictAfter() throws Exception {
+               AtomicInteger closeCalled = new AtomicInteger(0);
+               final int WINDOW_SIZE = 4;
+               final int TRIGGER_COUNT = 2;
+               final boolean EVICT_AFTER = true;
+
+               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
+
+               @SuppressWarnings({"unchecked", "rawtypes"})
+               TypeSerializer<StreamRecord<Tuple2<String, Integer>>> 
streamRecordSerializer =
+                       (TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) 
new StreamElementSerializer(inputType.createSerializer(new ExecutionConfig()));
+
+               ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> 
stateDesc =
+                       new ListStateDescriptor<>("window-contents", 
streamRecordSerializer);
+
+
+               EvictingWindowOperator<String, Tuple2<String, Integer>, 
Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
+                       GlobalWindows.create(),
+                       new GlobalWindow.Serializer(),
+                       new TupleKeySelector(),
+                       BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new 
ExecutionConfig()),
+                       stateDesc,
+                       new InternalIterableWindowFunction<>(new 
RichSumReducer<GlobalWindow>(closeCalled)),
+                       CountTrigger.of(TRIGGER_COUNT),
+                       CountEvictor.of(WINDOW_SIZE,EVICT_AFTER),
+                       0);
+
+
+               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
+                       new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+
+               long initialTime = 0L;
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+               testHarness.open();
+
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 3000));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 3999));
+
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), initialTime + 20));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), initialTime));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), initialTime + 999));
+
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 1998));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 1999));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 1000));
+
+
+
+               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 
Long.MAX_VALUE));
+               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), 
Long.MAX_VALUE));
+               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 
Long.MAX_VALUE));
+
+               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), initialTime + 10999));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 1000));
+
+               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), 
Long.MAX_VALUE));
+               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 6), 
Long.MAX_VALUE));
+
+               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 1000));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 1000));
+               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 6), 
Long.MAX_VALUE));
+
+               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+               testHarness.close();
+
+               Assert.assertEquals("Close was not called.", 1, 
closeCalled.get());
+
+       }
+
+       /**
+        * Tests TimeEvictor evictAfter behavior
+        * @throws Exception
+        */
+       @Test
+       public void testTimeEvictorEvictAfter() throws Exception {
+               AtomicInteger closeCalled = new AtomicInteger(0);
+               final int TRIGGER_COUNT = 2;
+               final boolean EVICT_AFTER = true;
+
+               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
+
+               @SuppressWarnings({"unchecked", "rawtypes"})
+               TypeSerializer<StreamRecord<Tuple2<String, Integer>>> 
streamRecordSerializer =
+                       (TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) 
new StreamElementSerializer(inputType.createSerializer(new ExecutionConfig()));
+
+               ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> 
stateDesc =
+                       new ListStateDescriptor<>("window-contents", 
streamRecordSerializer);
+
+
+               EvictingWindowOperator<String, Tuple2<String, Integer>, 
Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
+                       GlobalWindows.create(),
+                       new GlobalWindow.Serializer(),
+                       new TupleKeySelector(),
+                       BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new 
ExecutionConfig()),
+                       stateDesc,
+                       new InternalIterableWindowFunction<>(new 
RichSumReducer<GlobalWindow>(closeCalled)),
+                       CountTrigger.of(TRIGGER_COUNT),
+                       TimeEvictor.of(Time.seconds(2), EVICT_AFTER),
+                       0);
+
+
+               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
+                       new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+               long initialTime = 0L;
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+               testHarness.open();
+
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 1000));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 4000));
+
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), initialTime + 20));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), initialTime));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), initialTime + 999));
+
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 3500));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 2001));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 1001));
+
+
+
+               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 
Long.MAX_VALUE));
+               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 
Long.MAX_VALUE));
+               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 
Long.MAX_VALUE));
+
+
+               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), initialTime + 10999));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 1002));
+
+               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), 
Long.MAX_VALUE));
+               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 5), 
Long.MAX_VALUE));
+
+               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+               testHarness.close();
+
+               Assert.assertEquals("Close was not called.", 1, 
closeCalled.get());
+
+       }
+
+       /**
+        * Tests TimeEvictor evictBefore behavior
+        * @throws Exception
+        */
+       @Test
+       public void testTimeEvictorEvictBefore() throws Exception {
+               AtomicInteger closeCalled = new AtomicInteger(0);
+               final int TRIGGER_COUNT = 2;
+               final int WINDOW_SIZE = 4;
+
+               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
+
+               @SuppressWarnings({"unchecked", "rawtypes"})
+               TypeSerializer<StreamRecord<Tuple2<String, Integer>>> 
streamRecordSerializer =
+                       (TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) 
new StreamElementSerializer(inputType.createSerializer(new ExecutionConfig()));
+
+               ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> 
stateDesc =
+                       new ListStateDescriptor<>("window-contents", 
streamRecordSerializer);
+
+
+               EvictingWindowOperator<String, Tuple2<String, Integer>, 
Tuple2<String, Integer>, TimeWindow> operator = new EvictingWindowOperator<>(
+                       TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, 
TimeUnit.SECONDS)),
+                       new TimeWindow.Serializer(),
+                       new TupleKeySelector(),
+                       BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new 
ExecutionConfig()),
+                       stateDesc,
+                       new InternalIterableWindowFunction<>(new 
RichSumReducer<TimeWindow>(closeCalled)),
+                       CountTrigger.of(TRIGGER_COUNT),
+                       TimeEvictor.of(Time.seconds(2)),
+                       0);
+
+
+               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
+                       new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+               long initialTime = 0L;
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+               testHarness.open();
+
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 1000));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 3999));
+
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), initialTime + 20));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), initialTime));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), initialTime + 999));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), initialTime + 5999));
+
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 3500));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 2001));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 1001));
+
+
+
+               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 1), 
3999));
+               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 
3999));
+               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 
3999));
+
+
+               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), initialTime + 6500));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 1002));
+
+               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 
7999));
+               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 
3999));
+
+               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+               testHarness.close();
+
+               Assert.assertEquals("Close was not called.", 1, 
closeCalled.get());
+
+       }
+
+       /**
+        * Tests time evictor, if no timestamp information in the StreamRecord
+        * No element will be evicted from the window
+        * @throws Exception
+        */
+       @Test
+       public void testTimeEvictorNoTimestamp() throws Exception {
+               AtomicInteger closeCalled = new AtomicInteger(0);
+               final int TRIGGER_COUNT = 2;
+               final boolean EVICT_AFTER = true;
+
+               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
+
+               @SuppressWarnings({"unchecked", "rawtypes"})
+               TypeSerializer<StreamRecord<Tuple2<String, Integer>>> 
streamRecordSerializer =
+                       (TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) 
new StreamElementSerializer(inputType.createSerializer(new ExecutionConfig()));
+
+               ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> 
stateDesc =
+                       new ListStateDescriptor<>("window-contents", 
streamRecordSerializer);
+
+
+               EvictingWindowOperator<String, Tuple2<String, Integer>, 
Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
+                       GlobalWindows.create(),
+                       new GlobalWindow.Serializer(),
+                       new TupleKeySelector(),
+                       BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new 
ExecutionConfig()),
+                       stateDesc,
+                       new InternalIterableWindowFunction<>(new 
RichSumReducer<GlobalWindow>(closeCalled)),
+                       CountTrigger.of(TRIGGER_COUNT),
+                       TimeEvictor.of(Time.seconds(2), EVICT_AFTER),
+                       0);
+
+
+               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
+                       new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+               testHarness.open();
+
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1)));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1)));
+
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1)));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1)));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1)));
+
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1)));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1)));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1)));
+
+
+
+               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 
Long.MAX_VALUE));
+               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 
Long.MAX_VALUE));
+               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), 
Long.MAX_VALUE));
+
+
+               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1)));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1)));
+
+               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), 
Long.MAX_VALUE));
+               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 6), 
Long.MAX_VALUE));
+
+               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+               testHarness.close();
+
+               Assert.assertEquals("Close was not called.", 1, 
closeCalled.get());
+
+       }
+
+       /**
+        * Tests DeltaEvictor, evictBefore behavior
+        * @throws Exception
+        */
+       @Test
+       public void testDeltaEvictorEvictBefore() throws Exception {
+               AtomicInteger closeCalled = new AtomicInteger(0);
+               final int TRIGGER_COUNT = 2;
+               final boolean EVICT_AFTER = false;
+               final int THRESHOLD = 2;
+
+               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
+
+               @SuppressWarnings({"unchecked", "rawtypes"})
+               TypeSerializer<StreamRecord<Tuple2<String, Integer>>> 
streamRecordSerializer =
+                       (TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) 
new StreamElementSerializer(inputType.createSerializer(new ExecutionConfig()));
+
+               ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> 
stateDesc =
+                       new ListStateDescriptor<>("window-contents", 
streamRecordSerializer);
+
+
+               EvictingWindowOperator<String, Tuple2<String, Integer>, 
Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
+                       GlobalWindows.create(),
+                       new GlobalWindow.Serializer(),
+                       new TupleKeySelector(),
+                       BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new 
ExecutionConfig()),
+                       stateDesc,
+                       new InternalIterableWindowFunction<>(new 
RichSumReducer<GlobalWindow>(closeCalled)),
+                       CountTrigger.of(TRIGGER_COUNT),
+                       DeltaEvictor.of(THRESHOLD, new 
DeltaFunction<Tuple2<String, Integer>>() {
+                               @Override
+                               public double getDelta(Tuple2<String, Integer> 
oldDataPoint, Tuple2<String, Integer> newDataPoint) {
+                                       return newDataPoint.f1 - 
oldDataPoint.f1;
+                               }
+                       }, EVICT_AFTER),
+                       0);
+
+
+
+               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
+                       new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+               long initialTime = 0L;
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+               testHarness.open();
+
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 3000));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 4), initialTime + 3999));
+
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), initialTime + 20));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), initialTime));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 5), initialTime + 999));
+
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 5), initialTime + 1998));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 6), initialTime + 1999));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 1000));
+
+               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), 
Long.MAX_VALUE));
+               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 11), 
Long.MAX_VALUE));
+               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 
Long.MAX_VALUE));
+
+               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 3), initialTime + 10999));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 10), initialTime + 1000));
+
+               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 8), 
Long.MAX_VALUE));
+               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 10), 
Long.MAX_VALUE));
+
+               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+               testHarness.close();
+
+               Assert.assertEquals("Close was not called.", 1, 
closeCalled.get());
+       }
+
+       /**
+        * Tests DeltaEvictor, evictAfter behavior
+        * @throws Exception
+        */
+       @Test
+       public void testDeltaEvictorEvictAfter() throws Exception {
+               AtomicInteger closeCalled = new AtomicInteger(0);
+               final int TRIGGER_COUNT = 2;
+               final boolean EVICT_AFTER = true;
+               final int THRESHOLD = 2;
+
+               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
+
+               @SuppressWarnings({"unchecked", "rawtypes"})
+               TypeSerializer<StreamRecord<Tuple2<String, Integer>>> 
streamRecordSerializer =
+                       (TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) 
new StreamElementSerializer(inputType.createSerializer(new ExecutionConfig()));
+
+               ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> 
stateDesc =
+                       new ListStateDescriptor<>("window-contents", 
streamRecordSerializer);
+
+
+               EvictingWindowOperator<String, Tuple2<String, Integer>, 
Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
+                       GlobalWindows.create(),
+                       new GlobalWindow.Serializer(),
+                       new TupleKeySelector(),
+                       BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new 
ExecutionConfig()),
+                       stateDesc,
+                       new InternalIterableWindowFunction<>(new 
RichSumReducer<GlobalWindow>(closeCalled)),
+                       CountTrigger.of(TRIGGER_COUNT),
+                       DeltaEvictor.of(THRESHOLD, new 
DeltaFunction<Tuple2<String, Integer>>() {
+                               @Override
+                               public double getDelta(Tuple2<String, Integer> 
oldDataPoint, Tuple2<String, Integer> newDataPoint) {
+                                       return newDataPoint.f1 - 
oldDataPoint.f1;
+                               }
+                       }, EVICT_AFTER),
+                       0);
+
+
+
+               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
+                       new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+               long initialTime = 0L;
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+               testHarness.open();
+
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 3000));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 4), initialTime + 3999));
+
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), initialTime + 20));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), initialTime));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 5), initialTime + 999));
+
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 5), initialTime + 1998));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 6), initialTime + 1999));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 1000));
+
+               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 5), 
Long.MAX_VALUE));
+               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 15), 
Long.MAX_VALUE));
+               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 
Long.MAX_VALUE));
+
+               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 9), initialTime + 10999));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 10), initialTime + 1000));
+
+               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 16), 
Long.MAX_VALUE));
+               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 22), 
Long.MAX_VALUE));
+
+               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+               testHarness.close();
+
+               Assert.assertEquals("Close was not called.", 1, 
closeCalled.get());
+       }
+
        @Test
        @SuppressWarnings("unchecked")
        public void testCountTrigger() throws Exception {
@@ -316,6 +777,7 @@ public class EvictingWindowOperatorTest {
                        for (Tuple2<String, Integer> t: input) {
                                sum += t.f1;
                        }
+
                        out.collect(new Tuple2<>(key, sum));
 
                }

Reply via email to