Repository: flink Updated Branches: refs/heads/master 62192c783 -> 74bb7bb63
[FLINK-4174] Enhance evictor functionality Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/74bb7bb6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/74bb7bb6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/74bb7bb6 Branch: refs/heads/master Commit: 74bb7bb63919ce6de5736d52e4e5a254cf9b6509 Parents: 62192c7 Author: Vishnu Viswanath <[email protected]> Authored: Mon Oct 31 18:21:04 2016 -0500 Committer: Aljoscha Krettek <[email protected]> Committed: Tue Nov 15 10:05:24 2016 +0100 ---------------------------------------------------------------------- .../api/windowing/evictors/CountEvictor.java | 54 ++- .../api/windowing/evictors/DeltaEvictor.java | 56 ++- .../api/windowing/evictors/Evictor.java | 52 ++- .../api/windowing/evictors/TimeEvictor.java | 84 +++- .../windowing/EvictingWindowOperator.java | 112 ++++- .../operators/windowing/TimestampedValue.java | 112 +++++ .../operators/windowing/WindowOperator.java | 4 +- .../windowing/EvictingWindowOperatorTest.java | 462 +++++++++++++++++++ 8 files changed, 886 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/74bb7bb6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java index dc82521..8f5b2d9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java @@ -19,7 +19,9 @@ package org.apache.flink.streaming.api.windowing.evictors; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.streaming.api.windowing.windows.Window; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue; + +import java.util.Iterator; /** * An {@link Evictor} that keeps up to a certain amount of elements. @@ -31,26 +33,68 @@ public class CountEvictor<W extends Window> implements Evictor<Object, W> { private static final long serialVersionUID = 1L; private final long maxCount; + private final boolean doEvictAfter; + + private CountEvictor(long count,boolean doEvictAfter) { + this.maxCount = count; + this.doEvictAfter = doEvictAfter; + } private CountEvictor(long count) { this.maxCount = count; + this.doEvictAfter = false; } @Override - public int evict(Iterable<StreamRecord<Object>> elements, int size, W window) { - if (size > maxCount) { - return (int) (size - maxCount); + public void evictBefore(Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) { + if (!doEvictAfter) { + evict(elements, size, ctx); + } + } + + + @Override + public void evictAfter(Iterable<TimestampedValue<Object>> elements, int size,W window, EvictorContext ctx) { + if (doEvictAfter) { + evict(elements, size, ctx); + } + } + + private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) { + if (size <= maxCount) { + return; } else { - return 0; + int evictedCount = 0; + for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext();){ + iterator.next(); + evictedCount++; + if (evictedCount > size - maxCount) { + break; + } else { + iterator.remove(); + } + } } } /** * Creates a {@code CountEvictor} that keeps the given number of elements. + * Eviction is done before the window function. * * @param maxCount The number of elements to keep in the pane. */ public static <W extends Window> CountEvictor<W> of(long maxCount) { return new CountEvictor<>(maxCount); } + + /** + * Creates a {@code CountEvictor} that keeps the given number of elements in the pane + * Eviction is done before/after the window function based on the value of doEvictAfter. + * + * @param maxCount The number of elements to keep in the pane. + * @param doEvictAfter Whether to do eviction after the window function. + */ + public static <W extends Window> CountEvictor<W> of(long maxCount, boolean doEvictAfter) { + return new CountEvictor<>(maxCount,doEvictAfter); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/74bb7bb6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java index ef4dad6..7ae33b8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java @@ -21,15 +21,16 @@ import com.google.common.collect.Iterables; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction; import org.apache.flink.streaming.api.windowing.windows.Window; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue; + +import java.util.Iterator; /** * An {@link Evictor} that keeps elements based on a {@link DeltaFunction} and a threshold. * * <p> * Eviction starts from the first element of the buffer and removes all elements from the buffer - * which have a higher delta then the threshold. As soon as there is an element with a lower delta, - * the eviction stops. + * which have a higher delta then the threshold. * * @param <W> The type of {@link Window Windows} on which this {@code Evictor} can operate. */ @@ -39,24 +40,42 @@ public class DeltaEvictor<T, W extends Window> implements Evictor<T, W> { DeltaFunction<T> deltaFunction; private double threshold; + private final boolean doEvictAfter; private DeltaEvictor(double threshold, DeltaFunction<T> deltaFunction) { this.deltaFunction = deltaFunction; this.threshold = threshold; + this.doEvictAfter = false; + } + + private DeltaEvictor(double threshold, DeltaFunction<T> deltaFunction, boolean doEvictAfter) { + this.deltaFunction = deltaFunction; + this.threshold = threshold; + this.doEvictAfter = doEvictAfter; } @Override - public int evict(Iterable<StreamRecord<T>> elements, int size, W window) { - StreamRecord<T> lastElement = Iterables.getLast(elements); - int toEvict = 0; - for (StreamRecord<T> element : elements) { - if (deltaFunction.getDelta(element.getValue(), lastElement.getValue()) < this.threshold) { - break; - } - toEvict++; + public void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext ctx) { + if (!doEvictAfter) { + evict(elements, size, ctx); } + } - return toEvict; + @Override + public void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext ctx) { + if (doEvictAfter) { + evict(elements, size, ctx); + } + } + + private void evict(Iterable<TimestampedValue<T>> elements, int size, EvictorContext ctx) { + TimestampedValue<T> lastElement = Iterables.getLast(elements); + for (Iterator<TimestampedValue<T>> iterator = elements.iterator(); iterator.hasNext();){ + TimestampedValue<T> element = iterator.next(); + if (deltaFunction.getDelta(element.getValue(), lastElement.getValue()) >= this.threshold) { + iterator.remove(); + } + } } @Override @@ -66,6 +85,7 @@ public class DeltaEvictor<T, W extends Window> implements Evictor<T, W> { /** * Creates a {@code DeltaEvictor} from the given threshold and {@code DeltaFunction}. + * Eviction is done before the window function. * * @param threshold The threshold * @param deltaFunction The {@code DeltaFunction} @@ -73,4 +93,16 @@ public class DeltaEvictor<T, W extends Window> implements Evictor<T, W> { public static <T, W extends Window> DeltaEvictor<T, W> of(double threshold, DeltaFunction<T> deltaFunction) { return new DeltaEvictor<>(threshold, deltaFunction); } + + /** + * Creates a {@code DeltaEvictor} from the given threshold, {@code DeltaFunction}. + * Eviction is done before/after the window function based on the value of doEvictAfter. + * + * @param threshold The threshold + * @param deltaFunction The {@code DeltaFunction} + * @param doEvictAfter Whether eviction should be done after window function + */ + public static <T, W extends Window> DeltaEvictor<T, W> of(double threshold, DeltaFunction<T> deltaFunction, boolean doEvictAfter) { + return new DeltaEvictor<>(threshold, deltaFunction, doEvictAfter); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/74bb7bb6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java index d8e0daa..02e93eb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java @@ -18,14 +18,17 @@ package org.apache.flink.streaming.api.windowing.evictors; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.streaming.api.windowing.windows.Window; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; + import java.io.Serializable; /** - * An {@code Evictor} can remove elements from a pane before it is being processed and after - * window evaluation was triggered by a - * {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}. + * An {@code Evictor} can remove elements from a pane before/after the evaluation of WindowFunction and + * after the window evaluation gets triggered by a {@link org.apache.flink.streaming.api.windowing.triggers.Trigger} * * <p> * A pane is the bucket of elements that have the same key (assigned by the @@ -41,13 +44,48 @@ import java.io.Serializable; public interface Evictor<T, W extends Window> extends Serializable { /** - * Computes how many elements should be removed from the pane. The result specifies how - * many elements should be removed from the beginning. + * Optionally evicts elements. Called before windowing function. + * + * @param elements The elements currently in the pane. + * @param size The current number of elements in the pane. + * @param window The {@link Window} + * @param evictorContext The context for the Evictor + */ + void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext); + + /** + * Optionally evicts elements. Called after windowing function. * * @param elements The elements currently in the pane. * @param size The current number of elements in the pane. * @param window The {@link Window} + * @param evictorContext The context for the Evictor */ - int evict(Iterable<StreamRecord<T>> elements, int size, W window); + void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext); + + + /** + * A context object that is given to {@link Evictor} methods + */ + interface EvictorContext { + + /** + * Returns the current processing time, as returned by + * the {@link ProcessingTimeService#getCurrentProcessingTime}. + */ + long getCurrentProcessingTime(); + + /** + * Returns the metric group for this {@link Evictor}. This is the same metric + * group that would be returned from {@link RuntimeContext#getMetricGroup()} in a user + * function. + * + * <p>You must not call methods that create metric objects + * (such as {@link MetricGroup#counter(int)} multiple times but instead call once + * and store the metric object in a field. + */ + MetricGroup getMetricGroup(); + + } } http://git-wip-us.apache.org/repos/asf/flink/blob/74bb7bb6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java index 369a7ae..33d1cb5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java @@ -17,16 +17,18 @@ */ package org.apache.flink.streaming.api.windowing.evictors; -import com.google.common.collect.Iterables; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.Window; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue; + +import java.util.Iterator; /** * An {@link Evictor} that keeps elements for a certain amount of time. Elements older - * than {@code current_time - keep_time} are evicted. + * than {@code current_time - keep_time} are evicted. The current_time is time associated + * with {@link TimestampedValue} * * @param <W> The type of {@link Window Windows} on which this {@code Evictor} can operate. */ @@ -35,23 +37,71 @@ public class TimeEvictor<W extends Window> implements Evictor<Object, W> { private static final long serialVersionUID = 1L; private final long windowSize; + private final boolean doEvictAfter; public TimeEvictor(long windowSize) { this.windowSize = windowSize; + this.doEvictAfter = false; + } + + public TimeEvictor(long windowSize, boolean doEvictAfter) { + this.windowSize = windowSize; + this.doEvictAfter = doEvictAfter; } + @Override - public int evict(Iterable<StreamRecord<Object>> elements, int size, W window) { - int toEvict = 0; - long currentTime = Iterables.getLast(elements).getTimestamp(); + public void evictBefore(Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) { + if(!doEvictAfter) { + evict(elements,size,ctx); + } + } + + @Override + public void evictAfter(Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) { + if(doEvictAfter) { + evict(elements,size,ctx); + } + } + + private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) { + if (!hasTimestamp(elements)) { + return; + } + + long currentTime = getMaxTimestamp(elements); long evictCutoff = currentTime - windowSize; - for (StreamRecord<Object> record: elements) { - if (record.getTimestamp() > evictCutoff) { - break; + + for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext(); ) { + TimestampedValue<Object> record = iterator.next(); + if (record.getTimestamp() <= evictCutoff) { + iterator.remove(); } - toEvict++; } - return toEvict; + } + + /** + * Returns true if the first element in the Iterable of {@link TimestampedValue} has a timestamp. + */ + private boolean hasTimestamp(Iterable<TimestampedValue<Object>> elements) { + Iterator<TimestampedValue<Object>> it = elements.iterator(); + if (it.hasNext()) { + return it.next().hasTimestamp(); + } + return false; + } + + /** + * @param elements The elements currently in the pane. + * @return The maximum value of timestamp among the elements. + */ + private long getMaxTimestamp(Iterable<TimestampedValue<Object>> elements) { + long currentTime = Long.MIN_VALUE; + for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext();){ + TimestampedValue<Object> record = iterator.next(); + currentTime = Math.max(currentTime, record.getTimestamp()); + } + return currentTime; } @Override @@ -66,10 +116,22 @@ public class TimeEvictor<W extends Window> implements Evictor<Object, W> { /** * Creates a {@code TimeEvictor} that keeps the given number of elements. + * Eviction is done before the window function. * * @param windowSize The amount of time for which to keep elements. */ public static <W extends Window> TimeEvictor<W> of(Time windowSize) { return new TimeEvictor<>(windowSize.toMilliseconds()); } + + /** + * Creates a {@code TimeEvictor} that keeps the given number of elements. + * Eviction is done before/after the window function based on the value of doEvictAfter. + * + * @param windowSize The amount of time for which to keep elements. + * @param doEvictAfter Whether eviction is done after window function. + */ + public static <W extends Window> TimeEvictor<W> of(Time windowSize, boolean doEvictAfter) { + return new TimeEvictor<>(windowSize.toMilliseconds(),doEvictAfter); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/74bb7bb6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java index f9b409e..3be3f5a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java @@ -29,6 +29,7 @@ import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.streaming.api.operators.InternalTimer; import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; @@ -47,8 +48,8 @@ import static java.util.Objects.requireNonNull; * A {@link WindowOperator} that also allows an {@link Evictor} to be used. * * <p> - * The {@code Evictor} is used to evict elements from panes before processing a window and after - * a {@link Trigger} has fired. + * The {@code Evictor} is used to remove elements from a pane before/after the evaluation of WindowFunction and + * after the window evaluation gets triggered by a {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}. * * @param <K> The type of key returned by the {@code KeySelector}. * @param <IN> The type of the incoming elements. @@ -62,6 +63,8 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window private final Evictor<? super IN, ? super W> evictor; + protected transient EvictorContext evictorContext = new EvictorContext(null, null); + private final StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> windowStateDescriptor; public EvictingWindowOperator(WindowAssigner<? super IN, W> windowAssigner, @@ -146,6 +149,8 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window context.key = key; context.window = actualWindow; + evictorContext.key = key; + evictorContext.window = actualWindow; // we might have already fired because of a merge but still call onElement // on the (possibly merged) window @@ -158,7 +163,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window // if we have no state, there is nothing to do continue; } - fire(actualWindow, contents); + fire(actualWindow, contents, windowState); } if (combinedTriggerResult.isPurge()) { @@ -183,6 +188,8 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window context.key = key; context.window = window; + evictorContext.key = key; + evictorContext.window = window; TriggerResult triggerResult = context.onElement(element); @@ -192,7 +199,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window // if we have no state, there is nothing to do continue; } - fire(window, contents); + fire(window, contents, windowState); } if (triggerResult.isPurge()) { @@ -209,6 +216,8 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window context.key = timer.getKey(); context.window = timer.getNamespace(); + evictorContext.key = timer.getKey(); + evictorContext.window = timer.getNamespace(); ListState<StreamRecord<IN>> windowState; MergingWindowSet<W> mergingWindows = null; @@ -238,7 +247,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window TriggerResult triggerResult = context.onEventTime(timer.getTimestamp()); if (triggerResult.isFire()) { - fire(context.window, contents); + fire(context.window, contents, windowState); } if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp()))) { @@ -250,6 +259,8 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window public void onProcessingTime(InternalTimer<K, W> timer) throws Exception { context.key = timer.getKey(); context.window = timer.getNamespace(); + evictorContext.key = timer.getKey(); + evictorContext.window = timer.getNamespace(); ListState<StreamRecord<IN>> windowState; MergingWindowSet<W> mergingWindows = null; @@ -276,7 +287,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp()); if (triggerResult.isFire()) { - fire(context.window, contents); + fire(context.window, contents, windowState); } if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp()))) { @@ -284,22 +295,79 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window } } - private void fire(W window, Iterable<StreamRecord<IN>> contents) throws Exception { + private void fire(W window, Iterable<StreamRecord<IN>> contents, ListState<StreamRecord<IN>> windowState) throws Exception { timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp()); // Work around type system restrictions... - int toEvict = evictor.evict((Iterable) contents, Iterables.size(contents), context.window); - - FluentIterable<IN> projectedContents = FluentIterable + FluentIterable<TimestampedValue<IN>> recordsWithTimestamp = FluentIterable .from(contents) - .skip(toEvict) - .transform(new Function<StreamRecord<IN>, IN>() { + .transform(new Function<StreamRecord<IN>, TimestampedValue<IN>>() { + @Override + public TimestampedValue<IN> apply(StreamRecord<IN> input) { + return TimestampedValue.from(input); + } + }); + evictorContext.evictBefore(recordsWithTimestamp, Iterables.size(recordsWithTimestamp)); + + FluentIterable<IN> projectedContents = recordsWithTimestamp + .transform(new Function<TimestampedValue<IN>, IN>() { @Override - public IN apply(StreamRecord<IN> input) { + public IN apply(TimestampedValue<IN> input) { return input.getValue(); } }); + userFunction.apply(context.key, context.window, projectedContents, timestampedCollector); + evictorContext.evictAfter(recordsWithTimestamp, Iterables.size(recordsWithTimestamp)); + + + //work around to fix FLINK-4369, remove the evicted elements from the windowState. + //this is inefficient, but there is no other way to remove elements from ListState, which is an AppendingState. + windowState.clear(); + for(TimestampedValue<IN> record : recordsWithTimestamp) { + windowState.add(record.getStreamRecord()); + } + } + + + /** + * {@code EvictorContext} is a utility for handling {@code Evictor} invocations. It can be reused + * by setting the {@code key} and {@code window} fields. No internal state must be kept in + * the {@code EvictorContext}. + */ + + class EvictorContext implements Evictor.EvictorContext { + + protected K key; + protected W window; + + public EvictorContext(K key, W window) { + this.key = key; + this.window = window; + } + + @Override + public long getCurrentProcessingTime() { + return EvictingWindowOperator.this.getProcessingTimeService().getCurrentProcessingTime(); + } + + @Override + public MetricGroup getMetricGroup() { + return EvictingWindowOperator.this.getMetricGroup(); + } + + + public K getKey() { + return key; + } + + void evictBefore(Iterable<TimestampedValue<IN>> elements, int size) { + evictor.evictBefore((Iterable)elements, size, window, this); + } + + void evictAfter(Iterable<TimestampedValue<IN>> elements, int size) { + evictor.evictAfter((Iterable)elements, size, window, this); + } } private void cleanup(W window, @@ -314,6 +382,24 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window context.clear(); } + @Override + public void open() throws Exception { + super.open(); + evictorContext = new EvictorContext(null,null); + } + + @Override + public void close() throws Exception { + super.close(); + evictorContext = null; + } + + @Override + public void dispose() throws Exception{ + super.dispose(); + evictorContext = null; + } + // ------------------------------------------------------------------------ // Getters for testing // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/74bb7bb6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/TimestampedValue.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/TimestampedValue.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/TimestampedValue.java new file mode 100644 index 0000000..9c63a69 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/TimestampedValue.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +/** + * Stores the value and the timestamp of the record. + * + * @param <T> The type encapsulated value + */ +@PublicEvolving +public class TimestampedValue<T> { + + /** The actual value held by this record */ + private T value; + + /** The timestamp of the record */ + private long timestamp; + + /** Flag whether the timestamp is actually set */ + private boolean hasTimestamp; + + /** + * Creates a new TimestampedValue. The record does not have a timestamp. + */ + public TimestampedValue(T value) { + this.value = value; + } + + /** + * Creates a new TimestampedValue wrapping the given value. The timestamp is set to the + * given timestamp. + * + * @param value The value to wrap in this {@link TimestampedValue} + * @param timestamp The timestamp in milliseconds + */ + public TimestampedValue(T value, long timestamp) { + this.value = value; + this.timestamp = timestamp; + this.hasTimestamp = true; + } + + /** + * @return The value wrapped in this {@link TimestampedValue}. + */ + public T getValue() { + return value; + } + + /** + * @return The timestamp associated with this stream value in milliseconds. + */ + public long getTimestamp() { + if (hasTimestamp) { + return timestamp; + } else { + throw new IllegalStateException( + "Record has no timestamp. Is the time characteristic set to 'ProcessingTime', or " + + "did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?"); + } + } + + /** + * Checks whether this record has a timestamp. + * + * @return True if the record has a timestamp, false if not. + */ + public boolean hasTimestamp() { + return hasTimestamp; + } + + /** + * Creates a {@link StreamRecord} from this TimestampedValue. + */ + public StreamRecord<T> getStreamRecord() { + StreamRecord<T> streamRecord = new StreamRecord<>(value); + if (hasTimestamp) { + streamRecord.setTimestamp(timestamp); + } + return streamRecord; + } + + /** + * Creates a TimestampedValue from given {@link StreamRecord}. + * + * @param streamRecord The StreamRecord object from which TimestampedValue is to be created. + */ + public static <T> TimestampedValue<T> from(StreamRecord<T> streamRecord) { + if (streamRecord.hasTimestamp()) { + return new TimestampedValue<>(streamRecord.getValue(), streamRecord.getTimestamp()); + } else { + return new TimestampedValue<>(streamRecord.getValue()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/74bb7bb6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index 229d97d..6ff3999 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -181,7 +181,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> } @Override - public final void open() throws Exception { + public void open() throws Exception { super.open(); timestampedCollector = new TimestampedCollector<>(output); @@ -200,7 +200,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> } @Override - public final void close() throws Exception { + public void close() throws Exception { super.close(); timestampedCollector = null; context = null; http://git-wip-us.apache.org/repos/asf/flink/blob/74bb7bb6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java index 2e3d090..46495b0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java @@ -29,10 +29,13 @@ import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.windowing.ReduceIterableWindowFunction; import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.evictors.CountEvictor; +import org.apache.flink.streaming.api.windowing.evictors.DeltaEvictor; +import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; @@ -58,6 +61,464 @@ public class EvictingWindowOperatorTest { // For counting if close() is called the correct number of times on the SumReducer + /** + * Tests CountEvictor evictAfter behavior + * @throws Exception + */ + @Test + public void testCountEvictorEvictAfter() throws Exception { + AtomicInteger closeCalled = new AtomicInteger(0); + final int WINDOW_SIZE = 4; + final int TRIGGER_COUNT = 2; + final boolean EVICT_AFTER = true; + + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + @SuppressWarnings({"unchecked", "rawtypes"}) + TypeSerializer<StreamRecord<Tuple2<String, Integer>>> streamRecordSerializer = + (TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) new StreamElementSerializer(inputType.createSerializer(new ExecutionConfig())); + + ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc = + new ListStateDescriptor<>("window-contents", streamRecordSerializer); + + + EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>( + GlobalWindows.create(), + new GlobalWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)), + CountTrigger.of(TRIGGER_COUNT), + CountEvictor.of(WINDOW_SIZE,EVICT_AFTER), + 0); + + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + + long initialTime = 0L; + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999)); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 999)); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000)); + + + + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), Long.MAX_VALUE)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10999)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000)); + + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), Long.MAX_VALUE)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 6), Long.MAX_VALUE)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 6), Long.MAX_VALUE)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + + testHarness.close(); + + Assert.assertEquals("Close was not called.", 1, closeCalled.get()); + + } + + /** + * Tests TimeEvictor evictAfter behavior + * @throws Exception + */ + @Test + public void testTimeEvictorEvictAfter() throws Exception { + AtomicInteger closeCalled = new AtomicInteger(0); + final int TRIGGER_COUNT = 2; + final boolean EVICT_AFTER = true; + + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + @SuppressWarnings({"unchecked", "rawtypes"}) + TypeSerializer<StreamRecord<Tuple2<String, Integer>>> streamRecordSerializer = + (TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) new StreamElementSerializer(inputType.createSerializer(new ExecutionConfig())); + + ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc = + new ListStateDescriptor<>("window-contents", streamRecordSerializer); + + + EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>( + GlobalWindows.create(), + new GlobalWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)), + CountTrigger.of(TRIGGER_COUNT), + TimeEvictor.of(Time.seconds(2), EVICT_AFTER), + 0); + + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + long initialTime = 0L; + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 4000)); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 999)); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3500)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 2001)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1001)); + + + + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), Long.MAX_VALUE)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), Long.MAX_VALUE)); + + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10999)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1002)); + + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), Long.MAX_VALUE)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 5), Long.MAX_VALUE)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + + testHarness.close(); + + Assert.assertEquals("Close was not called.", 1, closeCalled.get()); + + } + + /** + * Tests TimeEvictor evictBefore behavior + * @throws Exception + */ + @Test + public void testTimeEvictorEvictBefore() throws Exception { + AtomicInteger closeCalled = new AtomicInteger(0); + final int TRIGGER_COUNT = 2; + final int WINDOW_SIZE = 4; + + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + @SuppressWarnings({"unchecked", "rawtypes"}) + TypeSerializer<StreamRecord<Tuple2<String, Integer>>> streamRecordSerializer = + (TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) new StreamElementSerializer(inputType.createSerializer(new ExecutionConfig())); + + ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc = + new ListStateDescriptor<>("window-contents", streamRecordSerializer); + + + EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new EvictingWindowOperator<>( + TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>(closeCalled)), + CountTrigger.of(TRIGGER_COUNT), + TimeEvictor.of(Time.seconds(2)), + 0); + + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + long initialTime = 0L; + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999)); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 999)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 5999)); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3500)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 2001)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1001)); + + + + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 1), 3999)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 3999)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 3999)); + + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 6500)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1002)); + + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 7999)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 3999)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + + testHarness.close(); + + Assert.assertEquals("Close was not called.", 1, closeCalled.get()); + + } + + /** + * Tests time evictor, if no timestamp information in the StreamRecord + * No element will be evicted from the window + * @throws Exception + */ + @Test + public void testTimeEvictorNoTimestamp() throws Exception { + AtomicInteger closeCalled = new AtomicInteger(0); + final int TRIGGER_COUNT = 2; + final boolean EVICT_AFTER = true; + + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + @SuppressWarnings({"unchecked", "rawtypes"}) + TypeSerializer<StreamRecord<Tuple2<String, Integer>>> streamRecordSerializer = + (TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) new StreamElementSerializer(inputType.createSerializer(new ExecutionConfig())); + + ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc = + new ListStateDescriptor<>("window-contents", streamRecordSerializer); + + + EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>( + GlobalWindows.create(), + new GlobalWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)), + CountTrigger.of(TRIGGER_COUNT), + TimeEvictor.of(Time.seconds(2), EVICT_AFTER), + 0); + + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1))); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1))); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1))); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1))); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1))); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1))); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1))); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1))); + + + + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), Long.MAX_VALUE)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE)); + + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1))); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1))); + + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), Long.MAX_VALUE)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 6), Long.MAX_VALUE)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + + testHarness.close(); + + Assert.assertEquals("Close was not called.", 1, closeCalled.get()); + + } + + /** + * Tests DeltaEvictor, evictBefore behavior + * @throws Exception + */ + @Test + public void testDeltaEvictorEvictBefore() throws Exception { + AtomicInteger closeCalled = new AtomicInteger(0); + final int TRIGGER_COUNT = 2; + final boolean EVICT_AFTER = false; + final int THRESHOLD = 2; + + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + @SuppressWarnings({"unchecked", "rawtypes"}) + TypeSerializer<StreamRecord<Tuple2<String, Integer>>> streamRecordSerializer = + (TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) new StreamElementSerializer(inputType.createSerializer(new ExecutionConfig())); + + ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc = + new ListStateDescriptor<>("window-contents", streamRecordSerializer); + + + EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>( + GlobalWindows.create(), + new GlobalWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)), + CountTrigger.of(TRIGGER_COUNT), + DeltaEvictor.of(THRESHOLD, new DeltaFunction<Tuple2<String, Integer>>() { + @Override + public double getDelta(Tuple2<String, Integer> oldDataPoint, Tuple2<String, Integer> newDataPoint) { + return newDataPoint.f1 - oldDataPoint.f1; + } + }, EVICT_AFTER), + 0); + + + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + long initialTime = 0L; + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), initialTime + 3999)); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 5), initialTime + 999)); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), initialTime + 1998)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), initialTime + 1999)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000)); + + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 11), Long.MAX_VALUE)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), initialTime + 10999)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), initialTime + 1000)); + + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 8), Long.MAX_VALUE)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 10), Long.MAX_VALUE)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + + testHarness.close(); + + Assert.assertEquals("Close was not called.", 1, closeCalled.get()); + } + + /** + * Tests DeltaEvictor, evictAfter behavior + * @throws Exception + */ + @Test + public void testDeltaEvictorEvictAfter() throws Exception { + AtomicInteger closeCalled = new AtomicInteger(0); + final int TRIGGER_COUNT = 2; + final boolean EVICT_AFTER = true; + final int THRESHOLD = 2; + + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + @SuppressWarnings({"unchecked", "rawtypes"}) + TypeSerializer<StreamRecord<Tuple2<String, Integer>>> streamRecordSerializer = + (TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) new StreamElementSerializer(inputType.createSerializer(new ExecutionConfig())); + + ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc = + new ListStateDescriptor<>("window-contents", streamRecordSerializer); + + + EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>( + GlobalWindows.create(), + new GlobalWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)), + CountTrigger.of(TRIGGER_COUNT), + DeltaEvictor.of(THRESHOLD, new DeltaFunction<Tuple2<String, Integer>>() { + @Override + public double getDelta(Tuple2<String, Integer> oldDataPoint, Tuple2<String, Integer> newDataPoint) { + return newDataPoint.f1 - oldDataPoint.f1; + } + }, EVICT_AFTER), + 0); + + + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + long initialTime = 0L; + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), initialTime + 3999)); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 5), initialTime + 999)); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), initialTime + 1998)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), initialTime + 1999)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000)); + + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 5), Long.MAX_VALUE)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 15), Long.MAX_VALUE)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 9), initialTime + 10999)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), initialTime + 1000)); + + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 16), Long.MAX_VALUE)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 22), Long.MAX_VALUE)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + + testHarness.close(); + + Assert.assertEquals("Close was not called.", 1, closeCalled.get()); + } + @Test @SuppressWarnings("unchecked") public void testCountTrigger() throws Exception { @@ -316,6 +777,7 @@ public class EvictingWindowOperatorTest { for (Tuple2<String, Integer> t: input) { sum += t.f1; } + out.collect(new Tuple2<>(key, sum)); }
