[FLINK-1176] [streaming] WindowedDataStream rework for new windowing runtime
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c560d76f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c560d76f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c560d76f Branch: refs/heads/master Commit: c560d76fec7b0e389430fc418d83d8f38f5eb06f Parents: 1146f64 Author: Gyula Fora <gyf...@apache.org> Authored: Wed Feb 4 17:27:34 2015 +0100 Committer: mbalassi <mbala...@apache.org> Committed: Mon Feb 16 13:06:07 2015 +0100 ---------------------------------------------------------------------- .../api/datastream/DiscretizedStream.java | 217 +++++++ .../api/datastream/WindowedDataStream.java | 266 +++++---- .../operator/GroupedWindowInvokable.java | 506 ---------------- .../operator/WindowGroupReduceInvokable.java | 51 -- .../api/invokable/operator/WindowInvokable.java | 382 ------------ .../operator/WindowReduceInvokable.java | 59 -- .../operator/windowing/WindowMerger.java | 4 +- .../policy/CloneableEvictionPolicy.java | 1 - .../policy/CloneableTriggerPolicy.java | 1 - .../operator/GroupedWindowInvokableTest.java | 574 ------------------- .../invokable/operator/WindowInvokableTest.java | 261 --------- .../windowing/GroupedWindowInvokableTest.java | 574 +++++++++++++++++++ .../operator/windowing/WindowInvokableTest.java | 261 +++++++++ .../ml/IncrementalLearningSkeleton.java | 2 +- .../socket/SocketTextStreamWordCount.java | 27 +- .../windowing/TopSpeedWindowingExample.java | 2 +- .../scala/examples/join/WindowJoin.scala | 45 +- .../examples/windowing/TopSpeedWindowing.scala | 1 + .../api/scala/WindowedDataStream.scala | 12 +- 19 files changed, 1238 insertions(+), 2008 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c560d76f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java new file mode 100644 index 0000000..37fa1e1 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.datastream; + +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.RichGroupReduceFunction; +import org.apache.flink.api.common.functions.RichReduceFunction; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.streaming.api.invokable.StreamInvokable; +import org.apache.flink.streaming.api.invokable.operator.windowing.WindowFlattener; +import org.apache.flink.streaming.api.invokable.operator.windowing.StreamWindow; +import org.apache.flink.streaming.api.invokable.operator.windowing.StreamWindowTypeInfo; +import org.apache.flink.streaming.api.invokable.operator.windowing.WindowMerger; +import org.apache.flink.streaming.api.invokable.operator.windowing.WindowMapper; +import org.apache.flink.streaming.api.invokable.operator.windowing.WindowPartitioner; +import org.apache.flink.streaming.api.invokable.operator.windowing.WindowReducer; + +/** + * A {@link DiscretizedStream} represents a data stream that has been divided + * into windows (predefined chunks). User defined function such as + * {@link #reduceWindow(ReduceFunction)}, + * {@link #mapWindow(GroupReduceFunction)} or aggregations can be applied to the + * windows. + * + * @param <OUT> + * The output type of the {@link DiscretizedStream} + */ +public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> { + + protected SingleOutputStreamOperator<StreamWindow<OUT>, ?> discretizedStream; + + protected DiscretizedStream(SingleOutputStreamOperator<StreamWindow<OUT>, ?> discretizedStream, + KeySelector<OUT, ?> groupByKey) { + super(); + this.groupByKey = groupByKey; + this.discretizedStream = discretizedStream; + } + + /** + * Applies a reduce transformation on the windowed data stream by reducing + * the current window at every trigger.The user can also extend the + * {@link RichReduceFunction} to gain access to other features provided by + * the {@link org.apache.flink.api.common.functions.RichFunction} interface. + * + * @param reduceFunction + * The reduce function that will be applied to the windows. + * @return The transformed DataStream + */ + public DiscretizedStream<OUT> reduceWindow(ReduceFunction<OUT> reduceFunction) { + + DiscretizedStream<OUT> out = partition(false).transform("Window Reduce", getType(), + new WindowReducer<OUT>(reduceFunction)).merge(); + + if (!isGrouped()) { + return out.transform("Window Reduce", out.getType(), new WindowReducer<OUT>( + reduceFunction)); + } else { + return out; + } + } + + /** + * Applies a reduceGroup transformation on the windowed data stream by + * reducing the current window at every trigger. In contrast with the + * standard binary reducer, with reduceGroup the user can access all + * elements of the window at the same time through the iterable interface. + * The user can also extend the {@link RichGroupReduceFunction} to gain + * access to other features provided by the + * {@link org.apache.flink.api.common.functions.RichFunction} interface. + * + * @param reduceFunction + * The reduce function that will be applied to the windows. + * @return The transformed DataStream + */ + public <R> DiscretizedStream<R> mapWindow(GroupReduceFunction<OUT, R> reduceFunction) { + + TypeInformation<R> retType = TypeExtractor.getGroupReduceReturnTypes(reduceFunction, + getType()); + + DiscretizedStream<R> out = partition(true).transform("Window Reduce", retType, + new WindowMapper<OUT, R>(reduceFunction)); + + if (isGrouped()) { + return out.merge(); + } else { + return out; + } + + } + + private <R> DiscretizedStream<R> transform(String operatorName, TypeInformation<R> retType, + StreamInvokable<StreamWindow<OUT>, StreamWindow<R>> invokable) { + + return wrap(discretizedStream.transform(operatorName, new StreamWindowTypeInfo<R>(retType), + invokable)); + } + + private DiscretizedStream<OUT> partition(boolean isMap) { + + int parallelism = discretizedStream.getParallelism(); + + if (isGrouped()) { + DiscretizedStream<OUT> out = transform("Window partitioner", getType(), + new WindowPartitioner<OUT>(groupByKey)).setParallelism(parallelism); + + out.groupByKey = null; + + return out; + } else if (!isMap) { + return transform( + "Window partitioner", + getType(), + new WindowPartitioner<OUT>(discretizedStream.environment + .getDegreeOfParallelism())).setParallelism(parallelism); + } else { + return this; + } + } + + private DiscretizedStream<OUT> setParallelism(int parallelism) { + return wrap(discretizedStream.setParallelism(parallelism)); + } + + private DiscretizedStream<OUT> merge() { + TypeInformation<StreamWindow<OUT>> type = discretizedStream.getType(); + + return wrap(discretizedStream.groupBy(new WindowKey<OUT>()).transform("Window Merger", + type, new WindowMerger<OUT>())); + } + + public DataStream<OUT> flatten() { + return discretizedStream.transform("Window Flatten", getType(), new WindowFlattener<OUT>()); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + private <R> DiscretizedStream<R> wrap(SingleOutputStreamOperator stream) { + return new DiscretizedStream<R>(stream, (KeySelector<R, ?>) this.groupByKey); + } + + public DataStream<StreamWindow<OUT>> getDiscretizedStream() { + return discretizedStream; + } + + @SuppressWarnings("rawtypes") + protected Class<?> getClassAtPos(int pos) { + Class<?> type; + TypeInformation<OUT> outTypeInfo = getType(); + if (outTypeInfo.isTupleType()) { + type = ((TupleTypeInfo) outTypeInfo).getTypeAt(pos).getTypeClass(); + + } else if (outTypeInfo instanceof BasicArrayTypeInfo) { + + type = ((BasicArrayTypeInfo) outTypeInfo).getComponentTypeClass(); + + } else if (outTypeInfo instanceof PrimitiveArrayTypeInfo) { + Class<?> clazz = outTypeInfo.getTypeClass(); + if (clazz == boolean[].class) { + type = Boolean.class; + } else if (clazz == short[].class) { + type = Short.class; + } else if (clazz == int[].class) { + type = Integer.class; + } else if (clazz == long[].class) { + type = Long.class; + } else if (clazz == float[].class) { + type = Float.class; + } else if (clazz == double[].class) { + type = Double.class; + } else if (clazz == char[].class) { + type = Character.class; + } else { + throw new IndexOutOfBoundsException("Type could not be determined for array"); + } + + } else if (pos == 0) { + type = outTypeInfo.getTypeClass(); + } else { + throw new IndexOutOfBoundsException("Position is out of range"); + } + return type; + } + + /** + * Gets the output type. + * + * @return The output type. + */ + public TypeInformation<OUT> getType() { + return ((StreamWindowTypeInfo<OUT>) discretizedStream.getType()).getInnerType(); + } + + protected DiscretizedStream<OUT> copy() { + return new DiscretizedStream<OUT>(discretizedStream.copy(), groupByKey); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/c560d76f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java index 4fe356b..fc117a1 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java @@ -25,17 +25,20 @@ import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichGroupReduceFunction; import org.apache.flink.api.common.functions.RichReduceFunction; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.operators.Keys; import org.apache.flink.streaming.api.function.aggregation.AggregationFunction; import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType; import org.apache.flink.streaming.api.function.aggregation.ComparableAggregator; import org.apache.flink.streaming.api.function.aggregation.SumAggregator; import org.apache.flink.streaming.api.invokable.StreamInvokable; -import org.apache.flink.streaming.api.invokable.operator.GroupedWindowInvokable; -import org.apache.flink.streaming.api.invokable.operator.WindowGroupReduceInvokable; -import org.apache.flink.streaming.api.invokable.operator.WindowReduceInvokable; +import org.apache.flink.streaming.api.invokable.operator.windowing.GroupedStreamDiscretizer; +import org.apache.flink.streaming.api.invokable.operator.windowing.StreamDiscretizer; +import org.apache.flink.streaming.api.invokable.operator.windowing.StreamWindow; +import org.apache.flink.streaming.api.invokable.operator.windowing.StreamWindowTypeInfo; import org.apache.flink.streaming.api.windowing.helper.Time; import org.apache.flink.streaming.api.windowing.helper.WindowingHelper; import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy; @@ -44,12 +47,14 @@ import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy; import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy; import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy; import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy; +import org.apache.flink.streaming.util.keys.KeySelectorUtil; /** * A {@link WindowedDataStream} represents a data stream that has been divided * into windows (predefined chunks). User defined function such as - * {@link #reduce(ReduceFunction)}, {@link #reduceGroup(GroupReduceFunction)} or - * aggregations can be applied to the windows. + * {@link #reduceWindow(ReduceFunction)}, + * {@link #mapWindow(GroupReduceFunction)} or aggregations can be applied to the + * windows. * * @param <OUT> * The output type of the {@link WindowedDataStream} @@ -57,9 +62,12 @@ import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy; public class WindowedDataStream<OUT> { protected DataStream<OUT> dataStream; - protected boolean isGrouped; - protected boolean allCentral; - protected KeySelector<OUT, ?> keySelector; + + protected boolean isLocal = false; + protected boolean isCentral = true; + + protected KeySelector<OUT, ?> discretizerKey; + protected KeySelector<OUT, ?> groupByKey; protected List<WindowingHelper<OUT>> triggerHelpers; protected List<WindowingHelper<OUT>> evictionHelpers; @@ -67,6 +75,10 @@ public class WindowedDataStream<OUT> { protected LinkedList<TriggerPolicy<OUT>> userTriggers; protected LinkedList<EvictionPolicy<OUT>> userEvicters; + protected WindowedDataStream() { + + } + protected WindowedDataStream(DataStream<OUT> dataStream, WindowingHelper<OUT>... policyHelpers) { this.dataStream = dataStream.copy(); this.triggerHelpers = new ArrayList<WindowingHelper<OUT>>(); @@ -75,15 +87,9 @@ public class WindowedDataStream<OUT> { } if (dataStream instanceof GroupedDataStream) { - this.isGrouped = true; - this.keySelector = ((GroupedDataStream<OUT>) dataStream).keySelector; + this.discretizerKey = ((GroupedDataStream<OUT>) dataStream).keySelector; // set all policies distributed - this.allCentral = false; - - } else { - this.isGrouped = false; - // set all policies central - this.allCentral = true; + this.isCentral = false; } } @@ -102,27 +108,23 @@ public class WindowedDataStream<OUT> { } if (dataStream instanceof GroupedDataStream) { - this.isGrouped = true; - this.keySelector = ((GroupedDataStream<OUT>) dataStream).keySelector; + this.discretizerKey = ((GroupedDataStream<OUT>) dataStream).keySelector; // set all policies distributed - this.allCentral = false; + this.isCentral = false; - } else { - this.isGrouped = false; - // set all policies central - this.allCentral = true; } } protected WindowedDataStream(WindowedDataStream<OUT> windowedDataStream) { this.dataStream = windowedDataStream.dataStream.copy(); - this.isGrouped = windowedDataStream.isGrouped; - this.keySelector = windowedDataStream.keySelector; + this.discretizerKey = windowedDataStream.discretizerKey; + this.groupByKey = windowedDataStream.groupByKey; this.triggerHelpers = windowedDataStream.triggerHelpers; this.evictionHelpers = windowedDataStream.evictionHelpers; this.userTriggers = windowedDataStream.userTriggers; this.userEvicters = windowedDataStream.userEvicters; - this.allCentral = windowedDataStream.allCentral; + this.isCentral = windowedDataStream.isCentral; + this.isLocal = windowedDataStream.isLocal; } public <F> F clean(F f) { @@ -169,11 +171,11 @@ public class WindowedDataStream<OUT> { * @return The grouped {@link WindowedDataStream} */ public WindowedDataStream<OUT> groupBy(int... fields) { - WindowedDataStream<OUT> ret = this.copy(); - ret.dataStream = ret.dataStream.groupBy(fields); - ret.isGrouped = true; - ret.keySelector = ((GroupedDataStream<OUT>) ret.dataStream).keySelector; - return ret; + if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) { + return groupBy(new KeySelectorUtil.ArrayKeySelector<OUT>(fields)); + } else { + return groupBy(new Keys.ExpressionKeys<OUT>(fields, getType())); + } } /** @@ -193,11 +195,7 @@ public class WindowedDataStream<OUT> { * @return The grouped {@link WindowedDataStream} */ public WindowedDataStream<OUT> groupBy(String... fields) { - WindowedDataStream<OUT> ret = this.copy(); - ret.dataStream = ret.dataStream.groupBy(fields); - ret.isGrouped = true; - ret.keySelector = ((GroupedDataStream<OUT>) ret.dataStream).keySelector; - return ret; + return groupBy(new Keys.ExpressionKeys<OUT>(fields, getType())); } /** @@ -214,12 +212,48 @@ public class WindowedDataStream<OUT> { */ public WindowedDataStream<OUT> groupBy(KeySelector<OUT, ?> keySelector) { WindowedDataStream<OUT> ret = this.copy(); - ret.dataStream = ret.dataStream.groupBy(keySelector); - ret.isGrouped = true; - ret.keySelector = ((GroupedDataStream<OUT>) ret.dataStream).keySelector; + ret.groupByKey = keySelector; return ret; } + private WindowedDataStream<OUT> groupBy(Keys<OUT> keys) { + return groupBy(clean(KeySelectorUtil.getSelectorForKeys(keys, getType()))); + } + + /** + * Sets the windowed computations local, so that the windowing and reduce or + * aggregation logic will be computed for each parallel instance of this + * operator + * + * @return The local windowed data stream + */ + public WindowedDataStream<OUT> local() { + WindowedDataStream<OUT> out = copy(); + out.isLocal = true; + return out; + } + + private DiscretizedStream<OUT> discretize(boolean isMap) { + + StreamInvokable<OUT, StreamWindow<OUT>> discretizer; + + if (discretizerKey == null) { + discretizer = new StreamDiscretizer<OUT>(getTriggers(), getEvicters()); + } else { + discretizer = new GroupedStreamDiscretizer<OUT>(discretizerKey, + getDistributedTriggers(), getDistributedEvicters(), getCentralTriggers(), + getCentralEvicters()); + } + + int parallelism = isLocal || (discretizerKey != null) ? dataStream.environment + .getDegreeOfParallelism() : 1; + + return new DiscretizedStream<OUT>(dataStream.transform("Stream Discretizer", + new StreamWindowTypeInfo<OUT>(getType()), discretizer).setParallelism(parallelism), + groupByKey); + + } + /** * Applies a reduce transformation on the windowed data stream by reducing * the current window at every trigger.The user can also extend the @@ -230,9 +264,9 @@ public class WindowedDataStream<OUT> { * The reduce function that will be applied to the windows. * @return The transformed DataStream */ - public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reduceFunction) { - return dataStream.transform("Window-Reduce", getType(), - getReduceInvokable(reduceFunction)); + public DiscretizedStream<OUT> reduceWindow(ReduceFunction<OUT> reduceFunction) { + + return discretize(false).reduceWindow(reduceFunction); } /** @@ -248,15 +282,12 @@ public class WindowedDataStream<OUT> { * The reduce function that will be applied to the windows. * @return The transformed DataStream */ - public <R> SingleOutputStreamOperator<R, ?> reduceGroup( - GroupReduceFunction<OUT, R> reduceFunction) { - - TypeInformation<OUT> inType = getType(); - TypeInformation<R> outType = TypeExtractor - .getGroupReduceReturnTypes(reduceFunction, inType); + public <R> WindowedDataStream<R> mapWindow(GroupReduceFunction<OUT, R> reduceFunction) { + return discretize(true).mapWindow(reduceFunction); + } - return dataStream.transform("WindowReduce", outType, - getReduceGroupInvokable(reduceFunction)); + public DataStream<OUT> flatten() { + return dataStream; } /** @@ -269,18 +300,20 @@ public class WindowedDataStream<OUT> { * {@link org.apache.flink.api.common.functions.RichFunction} interface. * </br> </br> This version of reduceGroup uses user supplied * typeinformation for serializaton. Use this only when the system is unable - * to detect type information using: - * {@link #reduceGroup(GroupReduceFunction)} + * to detect type information using: {@link #mapWindow(GroupReduceFunction)} * * @param reduceFunction * The reduce function that will be applied to the windows. * @return The transformed DataStream */ - public <R> SingleOutputStreamOperator<R, ?> reduceGroup( + public <R> SingleOutputStreamOperator<R, ?> mapWindow( GroupReduceFunction<OUT, R> reduceFunction, TypeInformation<R> outType) { - return dataStream.transform("Window-Reduce", outType, - getReduceGroupInvokable(reduceFunction)); + throw new RuntimeException("Not implemented yet"); + } + + protected Class<?> getClassAtPos(int pos) { + return dataStream.getClassAtPos(pos); } /** @@ -291,10 +324,9 @@ public class WindowedDataStream<OUT> { * The position in the tuple/array to sum * @return The transformed DataStream. */ - public SingleOutputStreamOperator<OUT, ?> sum(int positionToSum) { - dataStream.checkFieldRange(positionToSum); + public WindowedDataStream<OUT> sum(int positionToSum) { return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(positionToSum, - dataStream.getClassAtPos(positionToSum), getType())); + getClassAtPos(positionToSum), getType())); } /** @@ -308,7 +340,7 @@ public class WindowedDataStream<OUT> { * The field to sum * @return The transformed DataStream. */ - public SingleOutputStreamOperator<OUT, ?> sum(String field) { + public WindowedDataStream<OUT> sum(String field) { return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(field, getType())); } @@ -320,8 +352,7 @@ public class WindowedDataStream<OUT> { * The position to minimize * @return The transformed DataStream. */ - public SingleOutputStreamOperator<OUT, ?> min(int positionToMin) { - dataStream.checkFieldRange(positionToMin); + public WindowedDataStream<OUT> min(int positionToMin) { return aggregate(ComparableAggregator.getAggregator(positionToMin, getType(), AggregationType.MIN)); } @@ -338,7 +369,7 @@ public class WindowedDataStream<OUT> { * applied. * @return The transformed DataStream. */ - public SingleOutputStreamOperator<OUT, ?> min(String field) { + public WindowedDataStream<OUT> min(String field) { return aggregate(ComparableAggregator.getAggregator(field, getType(), AggregationType.MIN, false)); } @@ -352,7 +383,7 @@ public class WindowedDataStream<OUT> { * The position to minimize by * @return The transformed DataStream. */ - public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy) { + public WindowedDataStream<OUT> minBy(int positionToMinBy) { return this.minBy(positionToMinBy, true); } @@ -365,7 +396,7 @@ public class WindowedDataStream<OUT> { * The position to minimize by * @return The transformed DataStream. */ - public SingleOutputStreamOperator<OUT, ?> minBy(String positionToMinBy) { + public WindowedDataStream<OUT> minBy(String positionToMinBy) { return this.minBy(positionToMinBy, true); } @@ -382,8 +413,7 @@ public class WindowedDataStream<OUT> { * minimum value, otherwise returns the last * @return The transformed DataStream. */ - public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy, boolean first) { - dataStream.checkFieldRange(positionToMinBy); + public WindowedDataStream<OUT> minBy(int positionToMinBy, boolean first) { return aggregate(ComparableAggregator.getAggregator(positionToMinBy, getType(), AggregationType.MINBY, first)); } @@ -403,7 +433,7 @@ public class WindowedDataStream<OUT> { * be returned * @return The transformed DataStream. */ - public SingleOutputStreamOperator<OUT, ?> minBy(String field, boolean first) { + public WindowedDataStream<OUT> minBy(String field, boolean first) { return aggregate(ComparableAggregator.getAggregator(field, getType(), AggregationType.MINBY, first)); } @@ -416,8 +446,7 @@ public class WindowedDataStream<OUT> { * The position to maximize * @return The transformed DataStream. */ - public SingleOutputStreamOperator<OUT, ?> max(int positionToMax) { - dataStream.checkFieldRange(positionToMax); + public WindowedDataStream<OUT> max(int positionToMax) { return aggregate(ComparableAggregator.getAggregator(positionToMax, getType(), AggregationType.MAX)); } @@ -434,7 +463,7 @@ public class WindowedDataStream<OUT> { * applied. * @return The transformed DataStream. */ - public SingleOutputStreamOperator<OUT, ?> max(String field) { + public WindowedDataStream<OUT> max(String field) { return aggregate(ComparableAggregator.getAggregator(field, getType(), AggregationType.MAX, false)); } @@ -448,7 +477,7 @@ public class WindowedDataStream<OUT> { * The position to maximize by * @return The transformed DataStream. */ - public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy) { + public WindowedDataStream<OUT> maxBy(int positionToMaxBy) { return this.maxBy(positionToMaxBy, true); } @@ -461,7 +490,7 @@ public class WindowedDataStream<OUT> { * The position to maximize by * @return The transformed DataStream. */ - public SingleOutputStreamOperator<OUT, ?> maxBy(String positionToMaxBy) { + public WindowedDataStream<OUT> maxBy(String positionToMaxBy) { return this.maxBy(positionToMaxBy, true); } @@ -478,8 +507,7 @@ public class WindowedDataStream<OUT> { * maximum value, otherwise returns the last * @return The transformed DataStream. */ - public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy, boolean first) { - dataStream.checkFieldRange(positionToMaxBy); + public WindowedDataStream<OUT> maxBy(int positionToMaxBy, boolean first) { return aggregate(ComparableAggregator.getAggregator(positionToMaxBy, getType(), AggregationType.MAXBY, first)); } @@ -499,21 +527,17 @@ public class WindowedDataStream<OUT> { * be returned * @return The transformed DataStream. */ - public SingleOutputStreamOperator<OUT, ?> maxBy(String field, boolean first) { + public WindowedDataStream<OUT> maxBy(String field, boolean first) { return aggregate(ComparableAggregator.getAggregator(field, getType(), AggregationType.MAXBY, first)); } - private SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregator) { - StreamInvokable<OUT, OUT> invokable = getReduceInvokable(aggregator); - - SingleOutputStreamOperator<OUT, ?> returnStream = dataStream.transform("Window-Aggregation", - getType(), invokable); + private WindowedDataStream<OUT> aggregate(AggregationFunction<OUT> aggregator) { - return returnStream; + return reduceWindow(aggregator); } - private LinkedList<TriggerPolicy<OUT>> getTriggers() { + protected LinkedList<TriggerPolicy<OUT>> getTriggers() { LinkedList<TriggerPolicy<OUT>> triggers = new LinkedList<TriggerPolicy<OUT>>(); @@ -531,7 +555,7 @@ public class WindowedDataStream<OUT> { } - private LinkedList<EvictionPolicy<OUT>> getEvicters() { + protected LinkedList<EvictionPolicy<OUT>> getEvicters() { LinkedList<EvictionPolicy<OUT>> evicters = new LinkedList<EvictionPolicy<OUT>>(); @@ -541,15 +565,15 @@ public class WindowedDataStream<OUT> { } } else { if (userEvicters == null) { - boolean notOnlyTime=false; - for (WindowingHelper<OUT> helper : triggerHelpers){ - if (helper instanceof Time<?>){ + boolean notOnlyTime = false; + for (WindowingHelper<OUT> helper : triggerHelpers) { + if (helper instanceof Time<?>) { evicters.add(helper.toEvict()); } else { - notOnlyTime=true; + notOnlyTime = true; } } - if (notOnlyTime){ + if (notOnlyTime) { evicters.add(new TumblingEvictionPolicy<OUT>()); } } @@ -562,9 +586,9 @@ public class WindowedDataStream<OUT> { return evicters; } - private LinkedList<TriggerPolicy<OUT>> getCentralTriggers() { + protected LinkedList<TriggerPolicy<OUT>> getCentralTriggers() { LinkedList<TriggerPolicy<OUT>> cTriggers = new LinkedList<TriggerPolicy<OUT>>(); - if (allCentral) { + if (isCentral) { cTriggers.addAll(getTriggers()); } else { for (TriggerPolicy<OUT> trigger : getTriggers()) { @@ -576,10 +600,10 @@ public class WindowedDataStream<OUT> { return cTriggers; } - private LinkedList<CloneableTriggerPolicy<OUT>> getDistributedTriggers() { + protected LinkedList<CloneableTriggerPolicy<OUT>> getDistributedTriggers() { LinkedList<CloneableTriggerPolicy<OUT>> dTriggers = null; - if (!allCentral) { + if (!isCentral) { dTriggers = new LinkedList<CloneableTriggerPolicy<OUT>>(); for (TriggerPolicy<OUT> trigger : getTriggers()) { if (!(trigger instanceof TimeTriggerPolicy)) { @@ -591,10 +615,10 @@ public class WindowedDataStream<OUT> { return dTriggers; } - private LinkedList<CloneableEvictionPolicy<OUT>> getDistributedEvicters() { + protected LinkedList<CloneableEvictionPolicy<OUT>> getDistributedEvicters() { LinkedList<CloneableEvictionPolicy<OUT>> evicters = null; - if (!allCentral) { + if (!isCentral) { evicters = new LinkedList<CloneableEvictionPolicy<OUT>>(); for (EvictionPolicy<OUT> evicter : getEvicters()) { evicters.add((CloneableEvictionPolicy<OUT>) evicter); @@ -604,41 +628,14 @@ public class WindowedDataStream<OUT> { return evicters; } - private LinkedList<EvictionPolicy<OUT>> getCentralEvicters() { - if (allCentral) { + protected LinkedList<EvictionPolicy<OUT>> getCentralEvicters() { + if (isCentral) { return getEvicters(); } else { return null; } } - private <R> StreamInvokable<OUT, R> getReduceGroupInvokable(GroupReduceFunction<OUT, R> reducer) { - StreamInvokable<OUT, R> invokable; - if (isGrouped) { - invokable = new GroupedWindowInvokable<OUT, R>(clean(reducer), keySelector, - getDistributedTriggers(), getDistributedEvicters(), getCentralTriggers(), - getCentralEvicters()); - - } else { - invokable = new WindowGroupReduceInvokable<OUT, R>(clean(reducer), getTriggers(), - getEvicters()); - } - return invokable; - } - - private StreamInvokable<OUT, OUT> getReduceInvokable(ReduceFunction<OUT> reducer) { - StreamInvokable<OUT, OUT> invokable; - if (isGrouped) { - invokable = new GroupedWindowInvokable<OUT, OUT>(clean(reducer), keySelector, - getDistributedTriggers(), getDistributedEvicters(), getCentralTriggers(), - getCentralEvicters()); - - } else { - invokable = new WindowReduceInvokable<OUT>(clean(reducer), getTriggers(), getEvicters()); - } - return invokable; - } - /** * Gets the output type. * @@ -648,11 +645,30 @@ public class WindowedDataStream<OUT> { return dataStream.getType(); } - public DataStream<OUT> getDataStream() { + protected DataStream<OUT> getDataStream() { return dataStream; } protected WindowedDataStream<OUT> copy() { return new WindowedDataStream<OUT>(this); } + + protected boolean isGrouped() { + return groupByKey != null; + } + + public DataStream<StreamWindow<OUT>> getDiscretizedStream() { + return discretize(true).getDiscretizedStream(); + } + + protected static class WindowKey<R> implements KeySelector<StreamWindow<R>, Integer> { + + private static final long serialVersionUID = 1L; + + @Override + public Integer getKey(StreamWindow<R> value) throws Exception { + return value.windowID; + } + + } } http://git-wip-us.apache.org/repos/asf/flink/blob/c560d76f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java deleted file mode 100644 index df2edd2..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java +++ /dev/null @@ -1,506 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.invokable.operator; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.Map; -import java.util.NoSuchElementException; - -import org.apache.flink.api.common.functions.Function; -import org.apache.flink.api.common.functions.GroupReduceFunction; -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.invokable.StreamInvokable; -import org.apache.flink.streaming.api.streamrecord.StreamRecord; -import org.apache.flink.streaming.api.windowing.policy.ActiveEvictionPolicy; -import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerCallback; -import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerPolicy; -import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy; -import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy; -import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy; -import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy; - -/** - * This invokable allows windowing based on {@link TriggerPolicy} and - * {@link EvictionPolicy} instances including their active and cloneable - * versions. It is additionally aware of the creation of windows per group. - * - * A {@link KeySelector} is used to specify the key position or key extraction. - * The {@link ReduceFunction} will be executed on each group separately. - * Policies might either be centralized or distributed. It is not possible to - * use central and distributed eviction policies at the same time. A distributed - * policy have to be a {@link CloneableTriggerPolicy} or - * {@link CloneableEvictionPolicy} as it will be cloned to have separated - * instances for each group. At the startup time the distributed policies will - * be stored as sample, and only clones of them will be used to maintain the - * groups. Therefore, each group starts with the initial policy states. - * - * While a distributed policy only gets notified with the elements belonging to - * the respective group, a centralized policy get notified with all arriving - * elements. When a centralized trigger occurred, all groups get triggered. This - * is done by submitting the element which caused the trigger as real element to - * the groups it belongs to and as fake element to all other groups. Within the - * groups the element might be further processed, causing more triggers, - * prenotifications of active distributed policies and evictions like usual. - * - * Central policies can be instance of {@link ActiveTriggerPolicy} and also - * implement the - * {@link ActiveTriggerPolicy#createActiveTriggerRunnable(ActiveTriggerCallback)} - * method. Fake elements created on prenotification will be forwarded to all - * groups. The {@link ActiveTriggerCallback} is also implemented in a way, that - * it forwards/distributed calls all groups. - * - * @param <IN> - * The type of input elements handled by this operator invokable. - */ -public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> { - - /** - * Auto-generated serial version UID - */ - private static final long serialVersionUID = -3469545957144404137L; - - private KeySelector<IN, ?> keySelector; - private Configuration parameters; - private LinkedList<ActiveTriggerPolicy<IN>> activeCentralTriggerPolicies; - private LinkedList<TriggerPolicy<IN>> centralTriggerPolicies; - private LinkedList<ActiveEvictionPolicy<IN>> activeCentralEvictionPolicies; - private LinkedList<EvictionPolicy<IN>> centralEvictionPolicies; - private LinkedList<CloneableTriggerPolicy<IN>> distributedTriggerPolicies; - private LinkedList<CloneableEvictionPolicy<IN>> distributedEvictionPolicies; - private Map<Object, WindowInvokable<IN, OUT>> windowingGroups; - private LinkedList<Thread> activePolicyThreads; - private LinkedList<TriggerPolicy<IN>> currentTriggerPolicies; - private LinkedList<WindowInvokable<IN, OUT>> deleteOrderForCentralEviction; - - /** - * This constructor creates an instance of the grouped windowing invokable. - * - * A {@link KeySelector} is used to specify the key position or key - * extraction. The {@link ReduceFunction} will be executed on each group - * separately. Policies might either be centralized or distributed. It is - * not possible to use central and distributed eviction policies at the same - * time. A distributed policy have to be a {@link CloneableTriggerPolicy} or - * {@link CloneableEvictionPolicy} as it will be cloned to have separated - * instances for each group. At the startup time the distributed policies - * will be stored as sample, and only clones of them will be used to - * maintain the groups. Therefore, each group starts with the initial policy - * states. - * - * While a distributed policy only gets notified with the elements belonging - * to the respective group, a centralized policy get notified with all - * arriving elements. When a centralized trigger occurred, all groups get - * triggered. This is done by submitting the element which caused the - * trigger as real element to the groups it belongs to and as fake element - * to all other groups. Within the groups the element might be further - * processed, causing more triggers, prenotifications of active distributed - * policies and evictions like usual. - * - * Central policies can be instance of {@link ActiveTriggerPolicy} and also - * implement the - * {@link ActiveTriggerPolicy#createActiveTriggerRunnable(ActiveTriggerCallback)} - * method. Fake elements created on prenotification will be forwarded to all - * groups. The {@link ActiveTriggerCallback} is also implemented in a way, - * that it forwards/distributed calls all groups. - * - * @param userFunction - * The user defined function. - * @param keySelector - * A key selector to extract the key for the groups from the - * input data. - * @param distributedTriggerPolicies - * Trigger policies to be distributed and maintained individually - * within each group. - * @param distributedEvictionPolicies - * Eviction policies to be distributed and maintained - * individually within each group. Note that there cannot be - * both, central and distributed eviction policies at the same - * time. - * @param centralTriggerPolicies - * Trigger policies which will only exist once at a central - * place. In case a central policy triggers, it will cause all - * groups to be emitted. (Remark: Empty groups cannot be emitted. - * If only one element is contained a group, this element itself - * is returned as aggregated result.) - * @param centralEvictionPolicies - * Eviction which will only exist once at a central place. Note - * that there cannot be both, central and distributed eviction - * policies at the same time. The central eviction policy will - * work on an simulated element buffer containing all elements no - * matter which group they belong to. - */ - public GroupedWindowInvokable(Function userFunction, KeySelector<IN, ?> keySelector, - LinkedList<CloneableTriggerPolicy<IN>> distributedTriggerPolicies, - LinkedList<CloneableEvictionPolicy<IN>> distributedEvictionPolicies, - LinkedList<TriggerPolicy<IN>> centralTriggerPolicies, - LinkedList<EvictionPolicy<IN>> centralEvictionPolicies) { - - super(userFunction); - - this.keySelector = keySelector; - - // handle the triggers - if (centralTriggerPolicies != null) { - this.centralTriggerPolicies = centralTriggerPolicies; - this.activeCentralTriggerPolicies = new LinkedList<ActiveTriggerPolicy<IN>>(); - - for (TriggerPolicy<IN> trigger : centralTriggerPolicies) { - if (trigger instanceof ActiveTriggerPolicy) { - this.activeCentralTriggerPolicies.add((ActiveTriggerPolicy<IN>) trigger); - } - } - } else { - this.centralTriggerPolicies = new LinkedList<TriggerPolicy<IN>>(); - } - - if (distributedTriggerPolicies != null) { - this.distributedTriggerPolicies = distributedTriggerPolicies; - } else { - this.distributedTriggerPolicies = new LinkedList<CloneableTriggerPolicy<IN>>(); - } - - if (distributedEvictionPolicies != null) { - this.distributedEvictionPolicies = distributedEvictionPolicies; - } else { - this.distributedEvictionPolicies = new LinkedList<CloneableEvictionPolicy<IN>>(); - } - - this.activeCentralEvictionPolicies = new LinkedList<ActiveEvictionPolicy<IN>>(); - - if (centralEvictionPolicies != null) { - this.centralEvictionPolicies = centralEvictionPolicies; - - for (EvictionPolicy<IN> eviction : centralEvictionPolicies) { - if (eviction instanceof ActiveEvictionPolicy) { - this.activeCentralEvictionPolicies.add((ActiveEvictionPolicy<IN>) eviction); - } - } - } else { - this.centralEvictionPolicies = new LinkedList<EvictionPolicy<IN>>(); - } - - this.windowingGroups = new HashMap<Object, WindowInvokable<IN, OUT>>(); - this.activePolicyThreads = new LinkedList<Thread>(); - this.currentTriggerPolicies = new LinkedList<TriggerPolicy<IN>>(); - this.deleteOrderForCentralEviction = new LinkedList<WindowInvokable<IN, OUT>>(); - - // check that not both, central and distributed eviction, is used at the - // same time. - if (!this.centralEvictionPolicies.isEmpty() && !this.distributedEvictionPolicies.isEmpty()) { - throw new UnsupportedOperationException( - "You can only use either central or distributed eviction policies but not both at the same time."); - } - - // Check that there is at least one trigger and one eviction policy - if (this.centralEvictionPolicies.isEmpty() && this.distributedEvictionPolicies.isEmpty()) { - throw new UnsupportedOperationException( - "You have to define at least one eviction policy"); - } - if (this.centralTriggerPolicies.isEmpty() && this.distributedTriggerPolicies.isEmpty()) { - throw new UnsupportedOperationException( - "You have to define at least one trigger policy"); - } - - } - - @Override - public void invoke() throws Exception { - // Prevent empty data streams - if (readNext() == null) { - throw new RuntimeException("DataStream must not be empty"); - } - - // Continuously run - while (nextRecord != null) { - WindowInvokable<IN, OUT> groupInvokable = windowingGroups.get(keySelector - .getKey(nextRecord.getObject())); - if (groupInvokable == null) { - groupInvokable = makeNewGroup(nextRecord); - } - - // Run the precalls for central active triggers - for (ActiveTriggerPolicy<IN> trigger : activeCentralTriggerPolicies) { - Object[] result = trigger.preNotifyTrigger(nextRecord.getObject()); - for (Object in : result) { - - // If central eviction is used, handle it here - if (!activeCentralEvictionPolicies.isEmpty()) { - evictElements(centralActiveEviction(in)); - } - - // process in groups - for (WindowInvokable<IN, OUT> group : windowingGroups.values()) { - group.processFakeElement(in, trigger); - checkForEmptyGroupBuffer(group); - } - } - } - - // Process non-active central triggers - for (TriggerPolicy<IN> triggerPolicy : centralTriggerPolicies) { - if (triggerPolicy.notifyTrigger(nextRecord.getObject())) { - currentTriggerPolicies.add(triggerPolicy); - } - } - - if (currentTriggerPolicies.isEmpty()) { - - // only add the element to its group - groupInvokable.processRealElement(nextRecord.getObject()); - checkForEmptyGroupBuffer(groupInvokable); - - // If central eviction is used, handle it here - if (!centralEvictionPolicies.isEmpty()) { - evictElements(centralEviction(nextRecord.getObject(), false)); - deleteOrderForCentralEviction.add(groupInvokable); - } - - } else { - - // call user function for all groups - for (WindowInvokable<IN, OUT> group : windowingGroups.values()) { - if (group == groupInvokable) { - // process real with initialized policies - group.processRealElement(nextRecord.getObject(), currentTriggerPolicies); - } else { - // process like a fake but also initialized with - // policies - group.externalTriggerFakeElement(nextRecord.getObject(), - currentTriggerPolicies); - } - - // remove group in case it has an empty buffer - // checkForEmptyGroupBuffer(group); - } - - // If central eviction is used, handle it here - if (!centralEvictionPolicies.isEmpty()) { - evictElements(centralEviction(nextRecord.getObject(), true)); - deleteOrderForCentralEviction.add(groupInvokable); - } - } - - // clear current trigger list - currentTriggerPolicies.clear(); - - // read next record - readNext(); - } - - // Stop all remaining threads from policies - for (Thread t : activePolicyThreads) { - t.interrupt(); - } - - // finally trigger the buffer. - for (WindowInvokable<IN, OUT> group : windowingGroups.values()) { - group.emitFinalWindow(centralTriggerPolicies); - } - - } - - /** - * This method creates a new group. The method gets called in case an - * element arrives which has a key which was not seen before. The method - * created a nested {@link WindowInvokable} and therefore created clones of - * all distributed trigger and eviction policies. - * - * @param element - * The element which leads to the generation of a new group - * (previously unseen key) - * @throws Exception - * In case the {@link KeySelector} throws an exception in - * {@link KeySelector#getKey(Object)}, the exception is not - * catched by this method. - */ - @SuppressWarnings("unchecked") - private WindowInvokable<IN, OUT> makeNewGroup(StreamRecord<IN> element) throws Exception { - // clone the policies - LinkedList<TriggerPolicy<IN>> clonedDistributedTriggerPolicies = new LinkedList<TriggerPolicy<IN>>(); - LinkedList<EvictionPolicy<IN>> clonedDistributedEvictionPolicies = new LinkedList<EvictionPolicy<IN>>(); - for (CloneableTriggerPolicy<IN> trigger : this.distributedTriggerPolicies) { - clonedDistributedTriggerPolicies.add(trigger.clone()); - } - for (CloneableEvictionPolicy<IN> eviction : this.distributedEvictionPolicies) { - clonedDistributedEvictionPolicies.add(eviction.clone()); - } - - WindowInvokable<IN, OUT> groupInvokable; - if (userFunction instanceof ReduceFunction) { - groupInvokable = (WindowInvokable<IN, OUT>) new WindowReduceInvokable<IN>( - (ReduceFunction<IN>) userFunction, clonedDistributedTriggerPolicies, - clonedDistributedEvictionPolicies); - } else { - groupInvokable = new WindowGroupReduceInvokable<IN, OUT>( - (GroupReduceFunction<IN, OUT>) userFunction, clonedDistributedTriggerPolicies, - clonedDistributedEvictionPolicies); - } - - groupInvokable.setup(taskContext, executionConfig); - groupInvokable.open(this.parameters); - windowingGroups.put(keySelector.getKey(element.getObject()), groupInvokable); - - return groupInvokable; - } - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - this.parameters = parameters; - for (ActiveTriggerPolicy<IN> tp : activeCentralTriggerPolicies) { - Runnable target = tp.createActiveTriggerRunnable(new WindowingCallback(tp)); - if (target != null) { - Thread thread = new Thread(target); - activePolicyThreads.add(thread); - thread.start(); - } - } - }; - - /** - * This method is used to notify central eviction policies with a real - * element. - * - * @param input - * the real element to notify the eviction policy. - * @param triggered - * whether a central trigger occurred or not. - * @return The number of elements to be deleted from the buffer. - */ - private int centralEviction(IN input, boolean triggered) { - // Process the evictions and take care of double evictions - // In case there are multiple eviction policies present, - // only the one with the highest return value is recognized. - int currentMaxEviction = 0; - for (EvictionPolicy<IN> evictionPolicy : centralEvictionPolicies) { - // use temporary variable to prevent multiple calls to - // notifyEviction - int tmp = evictionPolicy.notifyEviction(input, triggered, - deleteOrderForCentralEviction.size()); - if (tmp > currentMaxEviction) { - currentMaxEviction = tmp; - } - } - return currentMaxEviction; - } - - /** - * This method is used to notify active central eviction policies with a - * fake element. - * - * @param input - * the fake element to notify the active central eviction - * policies. - * @return The number of elements to be deleted from the buffer. - */ - private int centralActiveEviction(Object input) { - // Process the evictions and take care of double evictions - // In case there are multiple eviction policies present, - // only the one with the highest return value is recognized. - int currentMaxEviction = 0; - for (ActiveEvictionPolicy<IN> evictionPolicy : activeCentralEvictionPolicies) { - // use temporary variable to prevent multiple calls to - // notifyEviction - int tmp = evictionPolicy.notifyEvictionWithFakeElement(input, - deleteOrderForCentralEviction.size()); - if (tmp > currentMaxEviction) { - currentMaxEviction = tmp; - } - } - return currentMaxEviction; - } - - /** - * This method is used in central eviction to delete a given number of - * elements from the buffer. - * - * @param numToEvict - * number of elements to delete from the virtual central element - * buffer. - */ - private void evictElements(int numToEvict) { - HashSet<WindowInvokable<IN, OUT>> usedGroups = new HashSet<WindowInvokable<IN, OUT>>(); - for (; numToEvict > 0; numToEvict--) { - WindowInvokable<IN, OUT> currentGroup = deleteOrderForCentralEviction.getFirst(); - // Do the eviction - currentGroup.evictFirst(); - // Remember groups which possibly have an empty buffer after the - // eviction - usedGroups.add(currentGroup); - try { - deleteOrderForCentralEviction.removeFirst(); - } catch (NoSuchElementException e) { - // when buffer is empty, ignore exception and stop deleting - break; - } - - } - - // Remove groups with empty buffer - for (WindowInvokable<IN, OUT> group : usedGroups) { - checkForEmptyGroupBuffer(group); - } - } - - /** - * Checks if the element buffer of a given windowing group is empty. If so, - * the group will be deleted. - * - * @param group - * The windowing group to be checked and and removed in case its - * buffer is empty. - */ - private void checkForEmptyGroupBuffer(WindowInvokable<IN, OUT> group) { - if (group.isBufferEmpty()) { - windowingGroups.remove(group); - } - } - - /** - * This callback class allows to handle the callbacks done by threads - * defined in active trigger policies - * - * @see ActiveTriggerPolicy#createActiveTriggerRunnable(ActiveTriggerCallback) - */ - private class WindowingCallback implements ActiveTriggerCallback { - private ActiveTriggerPolicy<IN> policy; - - public WindowingCallback(ActiveTriggerPolicy<IN> policy) { - this.policy = policy; - } - - @Override - public void sendFakeElement(Object datapoint) { - - // If central eviction is used, handle it here - if (!centralEvictionPolicies.isEmpty()) { - evictElements(centralActiveEviction(datapoint)); - } - - // handle element in groups - for (WindowInvokable<IN, OUT> group : windowingGroups.values()) { - group.processFakeElement(datapoint, policy); - checkForEmptyGroupBuffer(group); - } - } - - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c560d76f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java deleted file mode 100644 index b3fdfe8..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.invokable.operator; - -import java.util.LinkedList; - -import org.apache.flink.api.common.functions.GroupReduceFunction; -import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy; -import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy; - -public class WindowGroupReduceInvokable<IN, OUT> extends WindowInvokable<IN, OUT> { - - private static final long serialVersionUID = 1L; - GroupReduceFunction<IN, OUT> reducer; - - public WindowGroupReduceInvokable(GroupReduceFunction<IN, OUT> userFunction, - LinkedList<TriggerPolicy<IN>> triggerPolicies, - LinkedList<EvictionPolicy<IN>> evictionPolicies) { - super(userFunction, triggerPolicies, evictionPolicies); - this.reducer = userFunction; - } - - @Override - protected void callUserFunction() throws Exception { - reducer.reduce(copyBuffer(), collector); - } - - public LinkedList<IN> copyBuffer() { - LinkedList<IN> copy = new LinkedList<IN>(); - for (IN element : buffer) { - copy.add(copy(element)); - } - return copy; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/c560d76f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java deleted file mode 100644 index ea891c9..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java +++ /dev/null @@ -1,382 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.invokable.operator; - -import java.util.LinkedList; -import java.util.List; -import java.util.NoSuchElementException; - -import org.apache.flink.api.common.functions.Function; -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.streaming.api.invokable.StreamInvokable; -import org.apache.flink.streaming.api.windowing.policy.ActiveEvictionPolicy; -import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerCallback; -import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerPolicy; -import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy; -import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy; - -public abstract class WindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> { - - /** - * Auto-generated serial version UID - */ - private static final long serialVersionUID = -8038984294071650730L; - - private LinkedList<TriggerPolicy<IN>> triggerPolicies; - private LinkedList<EvictionPolicy<IN>> evictionPolicies; - private LinkedList<ActiveTriggerPolicy<IN>> activeTriggerPolicies; - private LinkedList<ActiveEvictionPolicy<IN>> activeEvictionPolicies; - private LinkedList<Thread> activePolicyTreads; - protected LinkedList<IN> buffer; - private LinkedList<TriggerPolicy<IN>> currentTriggerPolicies; - - /** - * This constructor created a windowing invokable using trigger and eviction - * policies. - * - * @param userFunction - * The user defined {@link ReduceFunction} - * @param triggerPolicies - * A list of {@link TriggerPolicy}s and/or - * {@link ActiveTriggerPolicy}s - * @param evictionPolicies - * A list of {@link EvictionPolicy}s and/or - * {@link ActiveEvictionPolicy}s - */ - public WindowInvokable(Function userFunction, LinkedList<TriggerPolicy<IN>> triggerPolicies, - LinkedList<EvictionPolicy<IN>> evictionPolicies) { - super(userFunction); - - this.triggerPolicies = triggerPolicies; - this.evictionPolicies = evictionPolicies; - - activeTriggerPolicies = new LinkedList<ActiveTriggerPolicy<IN>>(); - for (TriggerPolicy<IN> tp : triggerPolicies) { - if (tp instanceof ActiveTriggerPolicy) { - activeTriggerPolicies.add((ActiveTriggerPolicy<IN>) tp); - } - } - - activeEvictionPolicies = new LinkedList<ActiveEvictionPolicy<IN>>(); - for (EvictionPolicy<IN> ep : evictionPolicies) { - if (ep instanceof ActiveEvictionPolicy) { - activeEvictionPolicies.add((ActiveEvictionPolicy<IN>) ep); - } - } - - this.activePolicyTreads = new LinkedList<Thread>(); - this.buffer = new LinkedList<IN>(); - this.currentTriggerPolicies = new LinkedList<TriggerPolicy<IN>>(); - } - - @Override - public void open(org.apache.flink.configuration.Configuration parameters) throws Exception { - super.open(parameters); - for (ActiveTriggerPolicy<IN> tp : activeTriggerPolicies) { - Runnable target = tp.createActiveTriggerRunnable(new WindowingCallback(tp)); - if (target != null) { - Thread thread = new Thread(target); - activePolicyTreads.add(thread); - thread.start(); - } - } - }; - - /** - * This class allows the active trigger threads to call back and push fake - * elements at any time. - */ - private class WindowingCallback implements ActiveTriggerCallback { - private ActiveTriggerPolicy<IN> policy; - - public WindowingCallback(ActiveTriggerPolicy<IN> policy) { - this.policy = policy; - } - - @Override - public void sendFakeElement(Object datapoint) { - processFakeElement(datapoint, this.policy); - } - - } - - @Override - public void invoke() throws Exception { - - // Prevent empty data streams - if (readNext() == null) { - throw new RuntimeException("DataStream must not be empty"); - } - - // Continuously run - while (nextRecord != null) { - processRealElement(nextRecord.getObject()); - - // Load next StreamRecord - readNext(); - } - - // Stop all remaining threads from policies - for (Thread t : activePolicyTreads) { - t.interrupt(); - } - - // finally trigger the buffer. - emitFinalWindow(null); - - } - - /** - * This method gets called in case of an grouped windowing in case central - * trigger occurred and the arriving element causing the trigger is not part - * of this group. - * - * Remark: This is NOT the same as - * {@link WindowInvokable#processFakeElement(Object, TriggerPolicy)}! Here - * the eviction using active policies takes place after the call to the UDF. - * Usually it is done before when fake elements get submitted. This special - * behaviour is needed to allow the {@link GroupedWindowInvokable} to send - * central triggers to all groups, even if the current element does not - * belong to the group. - * - * @param input - * a fake input element - * @param policies - * the list of policies which caused the call with this fake - * element - */ - protected synchronized void externalTriggerFakeElement(IN input, - List<TriggerPolicy<IN>> policies) { - - // Set the current triggers - currentTriggerPolicies.addAll(policies); - - // emit - callUserFunctionAndLogException(); - - // clear the flag collection - currentTriggerPolicies.clear(); - - // Process the evictions and take care of double evictions - // In case there are multiple eviction policies present, - // only the one with the highest return value is recognized. - int currentMaxEviction = 0; - for (ActiveEvictionPolicy<IN> evictionPolicy : activeEvictionPolicies) { - // use temporary variable to prevent multiple calls to - // notifyEviction - int tmp = evictionPolicy.notifyEvictionWithFakeElement(input, buffer.size()); - if (tmp > currentMaxEviction) { - currentMaxEviction = tmp; - } - } - - for (int i = 0; i < currentMaxEviction; i++) { - try { - buffer.removeFirst(); - } catch (NoSuchElementException e) { - // In case no more elements are in the buffer: - // Prevent failure and stop deleting. - break; - } - } - } - - /** - * This method processed an arrived fake element The method is synchronized - * to ensure that it cannot interleave with - * {@link WindowInvokable#processRealElement(Object)} - * - * @param input - * a fake input element - * @param currentPolicy - * the policy which produced this fake element - */ - protected synchronized void processFakeElement(Object input, TriggerPolicy<IN> currentPolicy) { - - // Process the evictions and take care of double evictions - // In case there are multiple eviction policies present, - // only the one with the highest return value is recognized. - int currentMaxEviction = 0; - for (ActiveEvictionPolicy<IN> evictionPolicy : activeEvictionPolicies) { - // use temporary variable to prevent multiple calls to - // notifyEviction - int tmp = evictionPolicy.notifyEvictionWithFakeElement(input, buffer.size()); - if (tmp > currentMaxEviction) { - currentMaxEviction = tmp; - } - } - - for (int i = 0; i < currentMaxEviction; i++) { - try { - buffer.removeFirst(); - } catch (NoSuchElementException e) { - // In case no more elements are in the buffer: - // Prevent failure and stop deleting. - break; - } - } - - // Set the current trigger - currentTriggerPolicies.add(currentPolicy); - - // emit - callUserFunctionAndLogException(); - - // clear the flag collection - currentTriggerPolicies.clear(); - } - - /** - * This method processed an arrived real element The method is synchronized - * to ensure that it cannot interleave with - * {@link WindowInvokable#processFakeElement(Object)}. - * - * @param input - * a real input element - * @param triggerPolicies - * Allows to set trigger policies which are maintained - * externally. This is the case for central policies in - * {@link GroupedWindowInvokable}. - */ - protected synchronized void processRealElement(IN input, List<TriggerPolicy<IN>> triggerPolicies) { - this.currentTriggerPolicies.addAll(triggerPolicies); - processRealElement(input); - } - - /** - * This method processed an arrived real element The method is synchronized - * to ensure that it cannot interleave with - * {@link WindowInvokable#processFakeElement(Object)} - * - * @param input - * a real input element - */ - protected synchronized void processRealElement(IN input) { - - // Run the precalls to detect missed windows - for (ActiveTriggerPolicy<IN> trigger : activeTriggerPolicies) { - // Remark: In case multiple active triggers are present the ordering - // of the different fake elements returned by this triggers becomes - // a problem. This might lead to unexpected results... - // Should we limit the number of active triggers to 0 or 1? - Object[] result = trigger.preNotifyTrigger(input); - for (Object in : result) { - processFakeElement(in, trigger); - } - } - - // Remember if a trigger occurred - boolean isTriggered = false; - - // Process the triggers - for (TriggerPolicy<IN> triggerPolicy : triggerPolicies) { - if (triggerPolicy.notifyTrigger(input)) { - currentTriggerPolicies.add(triggerPolicy); - } - } - - // call user function - if (!currentTriggerPolicies.isEmpty()) { - // emit - callUserFunctionAndLogException(); - - // clear the flag collection - currentTriggerPolicies.clear(); - - // remember trigger - isTriggered = true; - } - - // Process the evictions and take care of double evictions - // In case there are multiple eviction policies present, - // only the one with the highest return value is recognized. - int currentMaxEviction = 0; - - for (EvictionPolicy<IN> evictionPolicy : evictionPolicies) { - // use temporary variable to prevent multiple calls to - // notifyEviction - int tmp = evictionPolicy.notifyEviction(input, isTriggered, buffer.size()); - if (tmp > currentMaxEviction) { - currentMaxEviction = tmp; - } - } - - for (int i = 0; i < currentMaxEviction; i++) { - try { - buffer.removeFirst(); - } catch (NoSuchElementException e) { - // In case no more elements are in the buffer: - // Prevent failure and stop deleting. - break; - } - } - - // Add the current element to the buffer - buffer.add(input); - - } - - /** - * This method removes the first element from the element buffer. It is used - * to provide central evictions in {@link GroupedWindowInvokable} - */ - protected synchronized void evictFirst() { - try { - buffer.removeFirst(); - } catch (NoSuchElementException e) { - // ignore exception - } - } - - /** - * This method returns whether the element buffer is empty or not. It is - * used to figure out if a group can be deleted or not when - * {@link GroupedWindowInvokable} is used. - * - * @return true in case the buffer is empty otherwise false. - */ - protected boolean isBufferEmpty() { - return buffer.isEmpty(); - } - - /** - * This method does the final reduce at the end of the stream and emits the - * result. - * - * @param centralTriggerPolicies - * Allows to set trigger policies which are maintained - * externally. This is the case for central policies in - * {@link GroupedWindowInvokable}. - */ - protected void emitFinalWindow(List<TriggerPolicy<IN>> centralTriggerPolicies) { - if (!buffer.isEmpty()) { - currentTriggerPolicies.clear(); - - if (centralTriggerPolicies != null) { - currentTriggerPolicies.addAll(centralTriggerPolicies); - } - - for (TriggerPolicy<IN> policy : triggerPolicies) { - currentTriggerPolicies.add(policy); - } - - callUserFunctionAndLogException(); - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/c560d76f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java deleted file mode 100644 index ed246c8..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.invokable.operator; - -import java.util.Iterator; -import java.util.LinkedList; - -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy; -import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy; - -public class WindowReduceInvokable<IN> extends WindowInvokable<IN, IN> { - - private static final long serialVersionUID = 1L; - - ReduceFunction<IN> reducer; - - public WindowReduceInvokable(ReduceFunction<IN> userFunction, - LinkedList<TriggerPolicy<IN>> triggerPolicies, - LinkedList<EvictionPolicy<IN>> evictionPolicies) { - super(userFunction, triggerPolicies, evictionPolicies); - this.reducer = userFunction; - } - - @Override - protected void callUserFunction() throws Exception { - Iterator<IN> reducedIterator = buffer.iterator(); - IN reduced = null; - - while (reducedIterator.hasNext() && reduced == null) { - reduced = reducedIterator.next(); - } - - while (reducedIterator.hasNext()) { - IN next = reducedIterator.next(); - if (next != null) { - reduced = reducer.reduce(copy(reduced), copy(next)); - } - } - if (reduced != null) { - collector.collect(reduced); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c560d76f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java index 738c78f..1766b0b 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java @@ -22,11 +22,11 @@ import java.util.Map; import org.apache.flink.streaming.api.invokable.ChainableInvokable; -public class WindowCombiner<T> extends ChainableInvokable<StreamWindow<T>, StreamWindow<T>> { +public class WindowMerger<T> extends ChainableInvokable<StreamWindow<T>, StreamWindow<T>> { private Map<Integer, StreamWindow<T>> windows; - public WindowCombiner() { + public WindowMerger() { super(null); this.windows = new HashMap<Integer, StreamWindow<T>>(); } http://git-wip-us.apache.org/repos/asf/flink/blob/c560d76f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableEvictionPolicy.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableEvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableEvictionPolicy.java index 132b495..1937b3f 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableEvictionPolicy.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableEvictionPolicy.java @@ -17,7 +17,6 @@ package org.apache.flink.streaming.api.windowing.policy; -import org.apache.flink.streaming.api.invokable.operator.GroupedWindowInvokable; /** * When used in {@link GroupedWindowInvokable}, eviction policies must http://git-wip-us.apache.org/repos/asf/flink/blob/c560d76f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableTriggerPolicy.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableTriggerPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableTriggerPolicy.java index f5772a1..6a04461 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableTriggerPolicy.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableTriggerPolicy.java @@ -17,7 +17,6 @@ package org.apache.flink.streaming.api.windowing.policy; -import org.apache.flink.streaming.api.invokable.operator.GroupedWindowInvokable; /** * When used in {@link GroupedWindowInvokable}, trigger policies can provide