[streaming] WindowMapFunction added + Streaming package structure cleanup
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bb5dc7e3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bb5dc7e3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bb5dc7e3 Branch: refs/heads/master Commit: bb5dc7e3dd6825f8f78f42df6c4ca5ef00d520fe Parents: 3885823 Author: Gyula Fora <gyf...@apache.org> Authored: Fri Feb 13 11:03:08 2015 +0100 Committer: mbalassi <mbala...@apache.org> Committed: Mon Feb 16 13:06:08 2015 +0100 ---------------------------------------------------------------------- .../flink/api/java/typeutils/TypeExtractor.java | 2 +- .../streaming/api/datastream/DataStream.java | 16 +- .../api/datastream/DiscretizedStream.java | 55 ++--- .../api/datastream/WindowedDataStream.java | 218 +++++++++---------- .../temporaloperator/StreamCrossOperator.java | 13 +- .../environment/StreamExecutionEnvironment.java | 1 + .../api/function/RichWindowMapFunction.java | 31 +++ .../api/function/WindowMapFunction.java | 28 +++ .../function/source/ParallelSourceFunction.java | 40 ++-- .../operator/windowing/BasicWindowBuffer.java | 74 ------- .../windowing/CompletePreAggregator.java | 23 -- .../windowing/GroupedStreamDiscretizer.java | 2 + .../windowing/GroupedTimeDiscretizer.java | 1 + .../operator/windowing/StreamDiscretizer.java | 2 + .../operator/windowing/StreamWindow.java | 183 ---------------- .../windowing/StreamWindowSerializer.java | 138 ------------ .../windowing/StreamWindowTypeInfo.java | 74 ------- .../windowing/TumblingGroupedPreReducer.java | 95 -------- .../operator/windowing/TumblingPreReducer.java | 78 ------- .../operator/windowing/WindowBuffer.java | 36 --- .../operator/windowing/WindowFlattener.java | 1 + .../operator/windowing/WindowMapper.java | 22 +- .../operator/windowing/WindowMerger.java | 1 + .../operator/windowing/WindowPartitioner.java | 1 + .../operator/windowing/WindowReducer.java | 1 + .../streaming/api/windowing/StreamWindow.java | 181 +++++++++++++++ .../api/windowing/StreamWindowSerializer.java | 134 ++++++++++++ .../api/windowing/StreamWindowTypeInfo.java | 74 +++++++ .../windowbuffer/BasicWindowBuffer.java | 75 +++++++ .../windowbuffer/CompletePreAggregator.java | 23 ++ .../windowbuffer/TumblingGroupedPreReducer.java | 96 ++++++++ .../windowbuffer/TumblingPreReducer.java | 79 +++++++ .../windowing/windowbuffer/WindowBuffer.java | 37 ++++ .../api/invokable/operator/CoFlatMapTest.java | 84 ------- .../invokable/operator/CoGroupedReduceTest.java | 125 ----------- .../api/invokable/operator/CoMapTest.java | 57 ----- .../invokable/operator/CoStreamReduceTest.java | 71 ------ .../api/invokable/operator/CoWindowTest.java | 182 ---------------- .../invokable/operator/co/CoFlatMapTest.java | 84 +++++++ .../operator/co/CoGroupedReduceTest.java | 125 +++++++++++ .../api/invokable/operator/co/CoMapTest.java | 57 +++++ .../operator/co/CoStreamReduceTest.java | 71 ++++++ .../api/invokable/operator/co/CoWindowTest.java | 182 ++++++++++++++++ .../windowing/BasicWindowBufferTest.java | 83 ------- .../windowing/GroupedStreamDiscretizerTest.java | 2 + .../windowing/StreamDiscretizerTest.java | 2 + .../operator/windowing/StreamWindowTest.java | 201 ----------------- .../TumblingGroupedPreReducerTest.java | 124 ----------- .../windowing/TumblingPreReducerTest.java | 110 ---------- .../windowing/WindowIntegrationTest.java | 7 +- .../api/windowing/StreamWindowTest.java | 201 +++++++++++++++++ .../windowbuffer/BasicWindowBufferTest.java | 86 ++++++++ .../TumblingGroupedPreReducerTest.java | 127 +++++++++++ .../windowbuffer/TumblingPreReducerTest.java | 113 ++++++++++ .../ml/IncrementalLearningSkeleton.java | 6 +- .../examples/windowing/StockPrices.java | 30 +-- .../api/scala/WindowedDataStream.scala | 14 +- .../flink/streaming/api/scala/package.scala | 2 +- 58 files changed, 2032 insertions(+), 1949 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index 1b3003f..63273f8 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -212,7 +212,7 @@ public class TypeExtractor { // -------------------------------------------------------------------------------------------- @SuppressWarnings("unchecked") - private static <IN, OUT> TypeInformation<OUT> getUnaryOperatorReturnType(Function function, Class<?> baseClass, + public static <IN, OUT> TypeInformation<OUT> getUnaryOperatorReturnType(Function function, Class<?> baseClass, boolean hasIterable, boolean hasCollector, TypeInformation<IN> inType, String functionName, boolean allowMissing) { http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index c24ab99..e766626 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -208,7 +208,7 @@ public class DataStream<OUT> { return environment; } - public ExecutionConfig getExecutionConfig() { + protected ExecutionConfig getExecutionConfig() { return environment.getConfig(); } @@ -254,11 +254,12 @@ public class DataStream<OUT> { /** * Creates a new {@link ConnectedDataStream} by connecting - * {@link DataStream} outputs of different type with each other. The - * DataStreams connected using this operators can be used with CoFunctions. + * {@link DataStream} outputs of (possible) different typea with each other. + * The DataStreams connected using this operator can be used with + * CoFunctions to apply joint transformations. * * @param dataStream - * The DataStream with which this stream will be joined. + * The DataStream with which this stream will be connected. * @return The {@link ConnectedDataStream}. */ public <R> ConnectedDataStream<OUT, R> connect(DataStream<R> dataStream) { @@ -502,9 +503,10 @@ public class DataStream<OUT> { } /** - * Applies a reduce transformation on the data stream. The user can also - * extend the {@link RichReduceFunction} to gain access to other features - * provided by the + * Applies a reduce transformation on the data stream. The returned stream + * contains all the intermediate values of the reduce transformation. The + * user can also extend the {@link RichReduceFunction} to gain access to + * other features provided by the * {@link org.apache.flink.api.common.functions.RichFunction} interface. * * @param reducer http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java index c17169d..7ab7c2c 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java @@ -18,9 +18,8 @@ package org.apache.flink.streaming.api.datastream; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.common.functions.RichGroupReduceFunction; import org.apache.flink.api.common.functions.RichReduceFunction; import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; @@ -28,21 +27,21 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.streaming.api.function.WindowMapFunction; import org.apache.flink.streaming.api.invokable.StreamInvokable; -import org.apache.flink.streaming.api.invokable.operator.windowing.StreamWindow; -import org.apache.flink.streaming.api.invokable.operator.windowing.StreamWindowTypeInfo; import org.apache.flink.streaming.api.invokable.operator.windowing.WindowFlattener; import org.apache.flink.streaming.api.invokable.operator.windowing.WindowMapper; import org.apache.flink.streaming.api.invokable.operator.windowing.WindowMerger; import org.apache.flink.streaming.api.invokable.operator.windowing.WindowPartitioner; import org.apache.flink.streaming.api.invokable.operator.windowing.WindowReducer; +import org.apache.flink.streaming.api.windowing.StreamWindow; +import org.apache.flink.streaming.api.windowing.StreamWindowTypeInfo; /** * A {@link DiscretizedStream} represents a data stream that has been divided * into windows (predefined chunks). User defined function such as - * {@link #reduceWindow(ReduceFunction)}, - * {@link #mapWindow(GroupReduceFunction)} or aggregations can be applied to the - * windows. + * {@link #reduceWindow(ReduceFunction)}, {@link #mapWindow()} or aggregations + * can be applied to the windows. * * @param <OUT> * The output type of the {@link DiscretizedStream} @@ -60,6 +59,14 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> { this.transformation = tranformation; } + public DataStream<OUT> flatten() { + return discretizedStream.transform("Window Flatten", getType(), new WindowFlattener<OUT>()); + } + + public DataStream<StreamWindow<OUT>> getDiscretizedStream() { + return discretizedStream; + } + /** * Applies a reduce transformation on the windowed data stream by reducing * the current window at every trigger.The user can also extend the @@ -79,7 +86,7 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> { if (!isGrouped()) { return out.transform(WindowTransformation.REDUCEWINDOW, "Window Reduce", out.getType(), - new WindowReducer<OUT>(reduceFunction)); + new WindowReducer<OUT>(discretizedStream.clean(reduceFunction))); } else { return out; } @@ -90,29 +97,27 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> { * reducing the current window at every trigger. In contrast with the * standard binary reducer, with reduceGroup the user can access all * elements of the window at the same time through the iterable interface. - * The user can also extend the {@link RichGroupReduceFunction} to gain - * access to other features provided by the - * {@link org.apache.flink.api.common.functions.RichFunction} interface. + * The user can also extend the to gain access to other features provided by + * the {@link org.apache.flink.api.common.functions.RichFunction} interface. * - * @param reduceFunction + * @param windowMapFunction * The reduce function that will be applied to the windows. * @return The transformed DataStream */ @Override - public <R> DiscretizedStream<R> mapWindow(GroupReduceFunction<OUT, R> reduceFunction) { + public <R> DiscretizedStream<R> mapWindow(WindowMapFunction<OUT, R> windowMapFunction) { - TypeInformation<R> retType = TypeExtractor.getGroupReduceReturnTypes(reduceFunction, - getType()); + TypeInformation<R> retType = getWindowMapReturnTypes(windowMapFunction, getType()); - return mapWindow(reduceFunction, retType); + return mapWindow(windowMapFunction, retType); } @Override - public <R> DiscretizedStream<R> mapWindow(GroupReduceFunction<OUT, R> reduceFunction, + public <R> DiscretizedStream<R> mapWindow(WindowMapFunction<OUT, R> windowMapFunction, TypeInformation<R> returnType) { DiscretizedStream<R> out = partition(transformation).transform( WindowTransformation.REDUCEWINDOW, "Window Reduce", returnType, - new WindowMapper<OUT, R>(reduceFunction)); + new WindowMapper<OUT, R>(discretizedStream.clean(windowMapFunction))); if (isGrouped()) { return out.merge(); @@ -159,10 +164,6 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> { type, new WindowMerger<OUT>())); } - public DataStream<OUT> flatten() { - return discretizedStream.transform("Window Flatten", getType(), new WindowFlattener<OUT>()); - } - @SuppressWarnings("rawtypes") private <R> DiscretizedStream<R> wrap(SingleOutputStreamOperator stream) { return wrap(stream, transformation); @@ -174,10 +175,6 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> { return new DiscretizedStream<R>(stream, (KeySelector<R, ?>) this.groupByKey, transformation); } - public DataStream<StreamWindow<OUT>> getDiscretizedStream() { - return discretizedStream; - } - @SuppressWarnings("rawtypes") protected Class<?> getClassAtPos(int pos) { Class<?> type; @@ -231,6 +228,12 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> { return ((StreamWindowTypeInfo<OUT>) discretizedStream.getType()).getInnerType(); } + private static <IN, OUT> TypeInformation<OUT> getWindowMapReturnTypes( + WindowMapFunction<IN, OUT> windowMapInterface, TypeInformation<IN> inType) { + return TypeExtractor.getUnaryOperatorReturnType((Function) windowMapInterface, + WindowMapFunction.class, true, true, inType, null, false); + } + protected DiscretizedStream<OUT> copy() { return new DiscretizedStream<OUT>(discretizedStream.copy(), groupByKey, transformation); } http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java index 04a29a9..3ff5859 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java @@ -19,30 +19,24 @@ package org.apache.flink.streaming.api.datastream; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.Function; -import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.common.functions.RichGroupReduceFunction; import org.apache.flink.api.common.functions.RichReduceFunction; import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.operators.Keys; +import org.apache.flink.streaming.api.function.WindowMapFunction; import org.apache.flink.streaming.api.function.aggregation.AggregationFunction; import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType; import org.apache.flink.streaming.api.function.aggregation.ComparableAggregator; import org.apache.flink.streaming.api.function.aggregation.SumAggregator; import org.apache.flink.streaming.api.invokable.StreamInvokable; -import org.apache.flink.streaming.api.invokable.operator.windowing.BasicWindowBuffer; -import org.apache.flink.streaming.api.invokable.operator.windowing.CompletePreAggregator; import org.apache.flink.streaming.api.invokable.operator.windowing.GroupedStreamDiscretizer; import org.apache.flink.streaming.api.invokable.operator.windowing.GroupedTimeDiscretizer; import org.apache.flink.streaming.api.invokable.operator.windowing.StreamDiscretizer; -import org.apache.flink.streaming.api.invokable.operator.windowing.StreamWindow; -import org.apache.flink.streaming.api.invokable.operator.windowing.StreamWindowTypeInfo; -import org.apache.flink.streaming.api.invokable.operator.windowing.TumblingGroupedPreReducer; -import org.apache.flink.streaming.api.invokable.operator.windowing.TumblingPreReducer; -import org.apache.flink.streaming.api.invokable.operator.windowing.WindowBuffer; +import org.apache.flink.streaming.api.windowing.StreamWindow; +import org.apache.flink.streaming.api.windowing.StreamWindowTypeInfo; import org.apache.flink.streaming.api.windowing.helper.Time; import org.apache.flink.streaming.api.windowing.helper.WindowingHelper; import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy; @@ -51,20 +45,36 @@ import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy; import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy; import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy; import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy; +import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer; +import org.apache.flink.streaming.api.windowing.windowbuffer.CompletePreAggregator; +import org.apache.flink.streaming.api.windowing.windowbuffer.TumblingGroupedPreReducer; +import org.apache.flink.streaming.api.windowing.windowbuffer.TumblingPreReducer; +import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer; import org.apache.flink.streaming.util.keys.KeySelectorUtil; /** * A {@link WindowedDataStream} represents a data stream that has been divided * into windows (predefined chunks). User defined function such as - * {@link #reduceWindow(ReduceFunction)}, - * {@link #mapWindow(GroupReduceFunction)} or aggregations can be applied to the - * windows. + * {@link #reduceWindow(ReduceFunction)}, {@link #mapWindow()} or aggregations + * can be applied to the windows. * * @param <OUT> * The output type of the {@link WindowedDataStream} */ public class WindowedDataStream<OUT> { + protected enum WindowTransformation { + + REDUCEWINDOW, MAPWINDOW, NONE; + + private Function UDF; + + public WindowTransformation with(Function UDF) { + this.UDF = UDF; + return this; + } + } + protected DataStream<OUT> dataStream; protected boolean isLocal = false; @@ -113,10 +123,6 @@ public class WindowedDataStream<OUT> { public WindowedDataStream() { } - public <F> F clean(F f) { - return dataStream.clean(f); - } - /** * Defines the slide size (trigger frequency) for the windowed data stream. * This controls how often the user defined function will be triggered on @@ -201,7 +207,7 @@ public class WindowedDataStream<OUT> { } private WindowedDataStream<OUT> groupBy(Keys<OUT> keys) { - return groupBy(clean(KeySelectorUtil.getSelectorForKeys(keys, getType(), + return groupBy(dataStream.clean(KeySelectorUtil.getSelectorForKeys(keys, getType(), getExecutionConfig()))); } @@ -218,73 +224,13 @@ public class WindowedDataStream<OUT> { return out; } - private DiscretizedStream<OUT> discretize(WindowTransformation transformation, - WindowBuffer<OUT> windowBuffer) { - - StreamInvokable<OUT, StreamWindow<OUT>> discretizer = getDiscretizer(transformation, - windowBuffer, getTrigger(), getEviction(), discretizerKey); - - int parallelism = getDiscretizerParallelism(); - - return new DiscretizedStream<OUT>(dataStream.transform("Stream Discretizer", - new StreamWindowTypeInfo<OUT>(getType()), discretizer).setParallelism(parallelism), - groupByKey, transformation); - - } - - protected enum WindowTransformation { - - REDUCEWINDOW, MAPWINDOW, NONE; - - private Function UDF; - - public WindowTransformation with(Function UDF) { - this.UDF = UDF; - return this; - } - } - - private int getDiscretizerParallelism() { - return isLocal || (discretizerKey != null) ? dataStream.environment - .getDegreeOfParallelism() : 1; - } - - private StreamInvokable<OUT, StreamWindow<OUT>> getDiscretizer( - WindowTransformation transformation, WindowBuffer<OUT> windowBuffer, - TriggerPolicy<OUT> trigger, EvictionPolicy<OUT> eviction, - KeySelector<OUT, ?> discretizerKey) { - - if (discretizerKey == null) { - return new StreamDiscretizer<OUT>(trigger, eviction, windowBuffer); - } else if (trigger instanceof TimeTriggerPolicy - && ((TimeTriggerPolicy<OUT>) trigger).timestampWrapper.isDefaultTimestamp()) { - return new GroupedTimeDiscretizer<OUT>(discretizerKey, - (TimeTriggerPolicy<OUT>) trigger, (CloneableEvictionPolicy<OUT>) eviction, - windowBuffer); - } else { - return new GroupedStreamDiscretizer<OUT>(discretizerKey, - (CloneableTriggerPolicy<OUT>) trigger, (CloneableEvictionPolicy<OUT>) eviction, - windowBuffer); - } - + public DataStream<StreamWindow<OUT>> getDiscretizedStream() { + return discretize(WindowTransformation.NONE, new BasicWindowBuffer<OUT>()) + .getDiscretizedStream(); } - @SuppressWarnings("unchecked") - private WindowBuffer<OUT> getWindowBuffer(WindowTransformation transformation, - TriggerPolicy<OUT> trigger, EvictionPolicy<OUT> eviction, - KeySelector<OUT, ?> discretizerKey) { - - if (transformation == WindowTransformation.REDUCEWINDOW - && eviction instanceof TumblingEvictionPolicy) { - if (groupByKey == null) { - return new TumblingPreReducer<OUT>((ReduceFunction<OUT>) transformation.UDF, - getType().createSerializer(getExecutionConfig())); - } else { - return new TumblingGroupedPreReducer<OUT>((ReduceFunction<OUT>) transformation.UDF, - groupByKey, getType().createSerializer(getExecutionConfig())); - } - } - return new BasicWindowBuffer<OUT>(); + public DataStream<OUT> flatten() { + return dataStream; } /** @@ -317,17 +263,16 @@ public class WindowedDataStream<OUT> { * reducing the current window at every trigger. In contrast with the * standard binary reducer, with reduceGroup the user can access all * elements of the window at the same time through the iterable interface. - * The user can also extend the {@link RichGroupReduceFunction} to gain - * access to other features provided by the - * {@link org.apache.flink.api.common.functions.RichFunction} interface. + * The user can also extend the to gain access to other features provided by + * the {@link org.apache.flink.api.common.functions.RichFunction} interface. * - * @param reduceFunction + * @param windowMapFunction * The reduce function that will be applied to the windows. * @return The transformed DataStream */ - public <R> WindowedDataStream<R> mapWindow(GroupReduceFunction<OUT, R> reduceFunction) { - return discretize(WindowTransformation.MAPWINDOW.with(reduceFunction), - new BasicWindowBuffer<OUT>()).mapWindow(reduceFunction); + public <R> WindowedDataStream<R> mapWindow(WindowMapFunction<OUT, R> windowMapFunction) { + return discretize(WindowTransformation.MAPWINDOW.with(windowMapFunction), + new BasicWindowBuffer<OUT>()).mapWindow(windowMapFunction); } /** @@ -335,30 +280,80 @@ public class WindowedDataStream<OUT> { * reducing the current window at every trigger. In contrast with the * standard binary reducer, with reduceGroup the user can access all * elements of the window at the same time through the iterable interface. - * The user can also extend the {@link RichGroupReduceFunction} to gain - * access to other features provided by the - * {@link org.apache.flink.api.common.functions.RichFunction} interface. + * The user can also extend the to gain access to other features provided by + * the {@link org.apache.flink.api.common.functions.RichFunction} interface. * </br> </br> This version of reduceGroup uses user supplied * typeinformation for serializaton. Use this only when the system is unable - * to detect type information using: {@link #mapWindow(GroupReduceFunction)} + * to detect type information using: {@link #mapWindow()} * - * @param reduceFunction + * @param windowMapFunction * The reduce function that will be applied to the windows. * @return The transformed DataStream */ - public <R> WindowedDataStream<R> mapWindow(GroupReduceFunction<OUT, R> reduceFunction, + public <R> WindowedDataStream<R> mapWindow(WindowMapFunction<OUT, R> windowMapFunction, TypeInformation<R> outType) { - return discretize(WindowTransformation.MAPWINDOW.with(reduceFunction), - new BasicWindowBuffer<OUT>()).mapWindow(reduceFunction, outType); + return discretize(WindowTransformation.MAPWINDOW.with(windowMapFunction), + new BasicWindowBuffer<OUT>()).mapWindow(windowMapFunction, outType); } - public DataStream<OUT> flatten() { - return dataStream; + private DiscretizedStream<OUT> discretize(WindowTransformation transformation, + WindowBuffer<OUT> windowBuffer) { + + StreamInvokable<OUT, StreamWindow<OUT>> discretizer = getDiscretizer(transformation, + windowBuffer, getTrigger(), getEviction(), discretizerKey); + + int parallelism = getDiscretizerParallelism(); + + return new DiscretizedStream<OUT>(dataStream.transform("Stream Discretizer", + new StreamWindowTypeInfo<OUT>(getType()), discretizer).setParallelism(parallelism), + groupByKey, transformation); + } - protected Class<?> getClassAtPos(int pos) { - return dataStream.getClassAtPos(pos); + private int getDiscretizerParallelism() { + return isLocal || (discretizerKey != null) ? dataStream.environment + .getDegreeOfParallelism() : 1; + } + + private StreamInvokable<OUT, StreamWindow<OUT>> getDiscretizer( + WindowTransformation transformation, WindowBuffer<OUT> windowBuffer, + TriggerPolicy<OUT> trigger, EvictionPolicy<OUT> eviction, + KeySelector<OUT, ?> discretizerKey) { + + if (discretizerKey == null) { + return new StreamDiscretizer<OUT>(trigger, eviction, windowBuffer); + } else if (trigger instanceof TimeTriggerPolicy + && ((TimeTriggerPolicy<OUT>) trigger).timestampWrapper.isDefaultTimestamp()) { + return new GroupedTimeDiscretizer<OUT>(discretizerKey, + (TimeTriggerPolicy<OUT>) trigger, (CloneableEvictionPolicy<OUT>) eviction, + windowBuffer); + } else { + return new GroupedStreamDiscretizer<OUT>(discretizerKey, + (CloneableTriggerPolicy<OUT>) trigger, (CloneableEvictionPolicy<OUT>) eviction, + windowBuffer); + } + + } + + @SuppressWarnings("unchecked") + private WindowBuffer<OUT> getWindowBuffer(WindowTransformation transformation, + TriggerPolicy<OUT> trigger, EvictionPolicy<OUT> eviction, + KeySelector<OUT, ?> discretizerKey) { + + if (transformation == WindowTransformation.REDUCEWINDOW + && eviction instanceof TumblingEvictionPolicy) { + if (groupByKey == null) { + return new TumblingPreReducer<OUT>( + dataStream.clean((ReduceFunction<OUT>) transformation.UDF), getType() + .createSerializer(getExecutionConfig())); + } else { + return new TumblingGroupedPreReducer<OUT>( + dataStream.clean((ReduceFunction<OUT>) transformation.UDF), groupByKey, + getType().createSerializer(getExecutionConfig())); + } + } + return new BasicWindowBuffer<OUT>(); } /** @@ -610,6 +605,10 @@ public class WindowedDataStream<OUT> { } + protected boolean isGrouped() { + return groupByKey != null; + } + /** * Gets the output type. * @@ -619,21 +618,16 @@ public class WindowedDataStream<OUT> { return dataStream.getType(); } - protected WindowedDataStream<OUT> copy() { - return new WindowedDataStream<OUT>(this); - } - - protected boolean isGrouped() { - return groupByKey != null; + public ExecutionConfig getExecutionConfig() { + return dataStream.getExecutionConfig(); } - public DataStream<StreamWindow<OUT>> getDiscretizedStream() { - return discretize(WindowTransformation.NONE, new BasicWindowBuffer<OUT>()) - .getDiscretizedStream(); + protected Class<?> getClassAtPos(int pos) { + return dataStream.getClassAtPos(pos); } - public ExecutionConfig getExecutionConfig() { - return dataStream.getExecutionConfig(); + protected WindowedDataStream<OUT> copy() { + return new WindowedDataStream<OUT>(this); } protected static class WindowKey<R> implements KeySelector<StreamWindow<R>, Integer> { http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamCrossOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamCrossOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamCrossOperator.java index 03160c2..9af1648 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamCrossOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamCrossOperator.java @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit; import org.apache.flink.api.common.functions.CrossFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.api.java.operators.CrossOperator; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TupleTypeInfo; @@ -33,16 +34,24 @@ import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable; public class StreamCrossOperator<I1, I2> extends TemporalOperator<I1, I2, StreamCrossOperator.CrossWindow<I1, I2>> { - + public StreamCrossOperator(DataStream<I1> input1, DataStream<I2> input2) { super(input1, input2); } + protected <F> F clean(F f) { + if (input1.getExecutionEnvironment().getConfig().isClosureCleanerEnabled()) { + ClosureCleaner.clean(f, true); + } + ClosureCleaner.ensureSerializable(f); + return f; + } + @Override protected CrossWindow<I1, I2> createNextWindowOperator() { CrossWindowFunction<I1, I2, Tuple2<I1, I2>> crossWindowFunction = new CrossWindowFunction<I1, I2, Tuple2<I1, I2>>( - input1.clean(new CrossOperator.DefaultCrossFunction<I1, I2>())); + clean(new CrossOperator.DefaultCrossFunction<I1, I2>())); return new CrossWindow<I1, I2>(this, input1.connect(input2).addGeneralWindowCombine( crossWindowFunction, http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 65dde79..8e09506 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.List; import com.esotericsoftware.kryo.Serializer; + import org.apache.commons.lang3.Validate; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.io.InputFormat; http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/RichWindowMapFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/RichWindowMapFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/RichWindowMapFunction.java new file mode 100644 index 0000000..ac2a19e --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/RichWindowMapFunction.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.function; + +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.util.Collector; + +public abstract class RichWindowMapFunction<IN, OUT> extends AbstractRichFunction implements + WindowMapFunction<IN, OUT> { + + private static final long serialVersionUID = 9052714915997374185L; + + @Override + public abstract void mapWindow(Iterable<IN> values, Collector<OUT> out) throws Exception; + +} http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/WindowMapFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/WindowMapFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/WindowMapFunction.java new file mode 100644 index 0000000..273d731 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/WindowMapFunction.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.function; + +import java.io.Serializable; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.util.Collector; + +public interface WindowMapFunction<T, O> extends Function, Serializable { + + void mapWindow(Iterable<T> values, Collector<O> out) throws Exception; +} http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java index 46d4fe9..041915f 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java @@ -1,26 +1,22 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and * limitations under the License. - */ - -package org.apache.flink.streaming.api.function.source; - -import org.apache.flink.util.Collector; - + */ + +package org.apache.flink.streaming.api.function.source; + public interface ParallelSourceFunction<OUT> extends SourceFunction<OUT> { - public void invoke(Collector<OUT> collector) throws Exception; - -} +} http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/BasicWindowBuffer.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/BasicWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/BasicWindowBuffer.java deleted file mode 100644 index 4c6e7cd..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/BasicWindowBuffer.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.invokable.operator.windowing; - -import java.util.LinkedList; -import java.util.NoSuchElementException; - -import org.apache.flink.util.Collector; - -public class BasicWindowBuffer<T> implements WindowBuffer<T> { - - private static final long serialVersionUID = 1L; - protected LinkedList<T> buffer; - - public BasicWindowBuffer() { - this.buffer = new LinkedList<T>(); - } - - public boolean emitWindow(Collector<StreamWindow<T>> collector) { - if (!buffer.isEmpty()) { - StreamWindow<T> currentWindow = new StreamWindow<T>(); - currentWindow.addAll(buffer); - collector.collect(currentWindow); - return true; - } else { - return false; - } - } - - public void store(T element) throws Exception { - buffer.add(element); - } - - public void evict(int n) { - for (int i = 0; i < n; i++) { - try { - buffer.removeFirst(); - } catch (NoSuchElementException e) { - // In case no more elements are in the buffer: - // Prevent failure and stop deleting. - break; - } - } - } - - public int size() { - return buffer.size(); - } - - @Override - public BasicWindowBuffer<T> clone() { - return new BasicWindowBuffer<T>(); - } - - @Override - public String toString() { - return buffer.toString(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/CompletePreAggregator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/CompletePreAggregator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/CompletePreAggregator.java deleted file mode 100644 index e071c2b..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/CompletePreAggregator.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUStreamRecord<?>WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.invokable.operator.windowing; - -public interface CompletePreAggregator { - -} http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java index ae6a2d9..58d7adb 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java @@ -23,8 +23,10 @@ import java.util.Map; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.invokable.StreamInvokable; +import org.apache.flink.streaming.api.windowing.StreamWindow; import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy; import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy; +import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer; public class GroupedStreamDiscretizer<IN> extends StreamInvokable<IN, StreamWindow<IN>> { http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedTimeDiscretizer.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedTimeDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedTimeDiscretizer.java index 6d38ed9..5363c10 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedTimeDiscretizer.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedTimeDiscretizer.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.invokable.operator.windowing; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy; import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy; +import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer; public class GroupedTimeDiscretizer<IN> extends GroupedStreamDiscretizer<IN> { http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java index 6b0bcec..a14058a 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java @@ -18,11 +18,13 @@ package org.apache.flink.streaming.api.invokable.operator.windowing; import org.apache.flink.streaming.api.invokable.StreamInvokable; +import org.apache.flink.streaming.api.windowing.StreamWindow; import org.apache.flink.streaming.api.windowing.policy.ActiveEvictionPolicy; import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerCallback; import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerPolicy; import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy; import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy; +import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer; public class StreamDiscretizer<IN> extends StreamInvokable<IN, StreamWindow<IN>> { http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindow.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindow.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindow.java deleted file mode 100644 index 90c4e62..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindow.java +++ /dev/null @@ -1,183 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.invokable.operator.windowing; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Random; - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.util.Collector; - -public class StreamWindow<T> extends ArrayList<T> implements Collector<T> { - - private static final long serialVersionUID = -5150196421193988403L; - private static Random rnd = new Random(); - - public int windowID; - public int transformationID; - - public int numberOfParts; - - public StreamWindow() { - this(rnd.nextInt(), rnd.nextInt(), 1); - } - - public StreamWindow(int windowID) { - this(windowID, rnd.nextInt(), 1); - } - - public StreamWindow(int windowID, int transformationID, int numberOfParts) { - super(); - this.windowID = windowID; - this.transformationID = transformationID; - this.numberOfParts = numberOfParts; - } - - public StreamWindow(StreamWindow<T> window) { - this(window.windowID, window.transformationID, window.numberOfParts); - addAll(window); - } - - public StreamWindow(StreamWindow<T> window, TypeSerializer<T> serializer) { - this(window.windowID, window.transformationID, window.numberOfParts); - for (T element : window) { - add(serializer.copy(element)); - } - } - - public List<StreamWindow<T>> partitionBy(KeySelector<T, ?> keySelector) throws Exception { - Map<Object, StreamWindow<T>> partitions = new HashMap<Object, StreamWindow<T>>(); - - for (T value : this) { - Object key = keySelector.getKey(value); - StreamWindow<T> window = partitions.get(key); - if (window == null) { - window = new StreamWindow<T>(this.windowID, this.transformationID, 0); - partitions.put(key, window); - } - window.add(value); - } - - List<StreamWindow<T>> output = new ArrayList<StreamWindow<T>>(); - int numkeys = partitions.size(); - - for (StreamWindow<T> window : partitions.values()) { - output.add(window.setNumberOfParts(numkeys)); - } - - return output; - } - - public List<StreamWindow<T>> split(int n) { - int numElements = size(); - if (n > numElements) { - return split(numElements); - } else { - List<StreamWindow<T>> split = new ArrayList<StreamWindow<T>>(); - int splitSize = numElements / n; - - int index = -1; - - StreamWindow<T> currentSubWindow = new StreamWindow<T>(windowID, transformationID, n); - split.add(currentSubWindow); - - for (T element : this) { - index++; - if (index == splitSize && split.size() < n) { - currentSubWindow = new StreamWindow<T>(windowID, transformationID, n); - split.add(currentSubWindow); - index = 0; - } - currentSubWindow.add(element); - } - return split; - } - } - - public StreamWindow<T> setNumberOfParts(int n) { - this.numberOfParts = n; - return this; - } - - public boolean compatibleWith(StreamWindow<T> otherWindow) { - return this.windowID == otherWindow.windowID && this.numberOfParts > 1; - } - - public static <R> StreamWindow<R> merge(StreamWindow<R>... windows) { - StreamWindow<R> window = new StreamWindow<R>(windows[0]); - for (int i = 1; i < windows.length; i++) { - StreamWindow<R> next = windows[i]; - if (window.compatibleWith(next)) { - window.addAll(next); - window.numberOfParts--; - } else { - throw new RuntimeException("Can only merge compatible windows"); - } - } - return window; - } - - public static <R> StreamWindow<R> merge(List<StreamWindow<R>> windows) { - if (windows.isEmpty()) { - throw new RuntimeException("Need at least one window to merge"); - } else { - StreamWindow<R> window = new StreamWindow<R>(windows.get(0)); - for (int i = 1; i < windows.size(); i++) { - StreamWindow<R> next = windows.get(i); - if (window.compatibleWith(next)) { - window.addAll(next); - window.numberOfParts--; - } else { - throw new RuntimeException("Can only merge compatible windows"); - } - } - return window; - } - } - - @Override - public boolean equals(Object o) { - return super.equals(o); - } - - @Override - public void collect(T record) { - add(record); - } - - @Override - public void close() { - } - - @Override - public String toString() { - return super.toString(); - } - - public static <R> StreamWindow<R> fromElements(R... elements) { - StreamWindow<R> window = new StreamWindow<R>(); - for (R element : elements) { - window.add(element); - } - return window; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindowSerializer.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindowSerializer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindowSerializer.java deleted file mode 100755 index 002e440..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindowSerializer.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUStreamRecord<?>WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.invokable.operator.windowing; - -import java.io.IOException; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; - -public final class StreamWindowSerializer<T> extends TypeSerializer<StreamWindow<T>> { - - private static final long serialVersionUID = 1L; - - private final TypeSerializer<T> typeSerializer; - TypeSerializer<Integer> intSerializer; - - public StreamWindowSerializer(TypeInformation<T> typeInfo, ExecutionConfig conf) { - this.typeSerializer = typeInfo.createSerializer(conf); - this.intSerializer = BasicTypeInfo.INT_TYPE_INFO.createSerializer(conf); - } - - public TypeSerializer<T> getObjectSerializer() { - return typeSerializer; - } - - @Override - public boolean isImmutableType() { - return false; - } - - @Override - public StreamWindow<T> createInstance() { - return new StreamWindow<T>(0, 0, 0); - } - - @Override - public StreamWindow<T> copy(StreamWindow<T> from) { - return new StreamWindow<T>(from, typeSerializer); - } - - @Override - public StreamWindow<T> copy(StreamWindow<T> from, StreamWindow<T> reuse) { - reuse.clear(); - reuse.windowID = from.windowID; - reuse.transformationID = from.transformationID; - reuse.numberOfParts = from.numberOfParts; - - for (T element : from) { - reuse.add(typeSerializer.copy(element)); - } - return reuse; - } - - @Override - public int getLength() { - return -1; - } - - @Override - public void serialize(StreamWindow<T> value, DataOutputView target) throws IOException { - - intSerializer.serialize(value.windowID, target); - intSerializer.serialize(value.transformationID, target); - intSerializer.serialize(value.numberOfParts, target); - intSerializer.serialize(value.size(), target); - - for (T element : value) { - typeSerializer.serialize(element, target); - } - } - - @Override - public StreamWindow<T> deserialize(DataInputView source) throws IOException { - StreamWindow<T> window = createInstance(); - - window.windowID = intSerializer.deserialize(source); - window.transformationID = intSerializer.deserialize(source); - window.numberOfParts = intSerializer.deserialize(source); - - int size = intSerializer.deserialize(source); - - for (int i = 0; i < size; i++) { - window.add(typeSerializer.deserialize(source)); - } - - return window; - } - - @Override - public StreamWindow<T> deserialize(StreamWindow<T> reuse, DataInputView source) - throws IOException { - - StreamWindow<T> window = reuse; - window.clear(); - - window.windowID = source.readInt(); - window.transformationID = source.readInt(); - window.numberOfParts = source.readInt(); - - int size = source.readInt(); - - for (int i = 0; i < size; i++) { - window.add(typeSerializer.deserialize(source)); - } - - return window; - } - - @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { - // Needs to be implemented - } - - @Override - public TypeSerializer<StreamWindow<T>> duplicate() { - return this; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindowTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindowTypeInfo.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindowTypeInfo.java deleted file mode 100644 index 0054759..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindowTypeInfo.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUStreamRecord<?>WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.invokable.operator.windowing; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; - -public class StreamWindowTypeInfo<T> extends TypeInformation<StreamWindow<T>> { - - private static final long serialVersionUID = 1L; - TypeInformation<T> innerType; - - public StreamWindowTypeInfo(TypeInformation<T> innerType) { - this.innerType = innerType; - } - - public TypeInformation<T> getInnerType() { - return innerType; - } - - @Override - public boolean isBasicType() { - return innerType.isBasicType(); - } - - @Override - public boolean isTupleType() { - return innerType.isTupleType(); - } - - @Override - public int getArity() { - return innerType.getArity(); - } - - @Override - public Class<StreamWindow<T>> getTypeClass() { - // TODO Auto-generated method stub - return null; - } - - @Override - public boolean isKeyType() { - return innerType.isKeyType(); - } - - @Override - public TypeSerializer<StreamWindow<T>> createSerializer(ExecutionConfig conf) { - return new StreamWindowSerializer<T>(innerType, conf); - } - - @Override - public int getTotalFields() { - return innerType.getTotalFields(); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/TumblingGroupedPreReducer.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/TumblingGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/TumblingGroupedPreReducer.java deleted file mode 100644 index f1d7ae4..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/TumblingGroupedPreReducer.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.invokable.operator.windowing; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.util.Collector; - -public class TumblingGroupedPreReducer<T> implements WindowBuffer<T>, CompletePreAggregator { - - private static final long serialVersionUID = 1L; - - private ReduceFunction<T> reducer; - private KeySelector<T, ?> keySelector; - - private Map<Object, T> reducedValues; - - private int numOfElements = 0; - private TypeSerializer<T> serializer; - - public TumblingGroupedPreReducer(ReduceFunction<T> reducer, KeySelector<T, ?> keySelector, - TypeSerializer<T> serializer) { - this.reducer = reducer; - this.serializer = serializer; - this.keySelector = keySelector; - this.reducedValues = new HashMap<Object, T>(); - } - - public boolean emitWindow(Collector<StreamWindow<T>> collector) { - - if (!reducedValues.isEmpty()) { - StreamWindow<T> currentWindow = new StreamWindow<T>(); - currentWindow.addAll(reducedValues.values()); - collector.collect(currentWindow); - reducedValues.clear(); - numOfElements = 0; - return true; - } else { - return false; - } - - } - - public void store(T element) throws Exception { - Object key = keySelector.getKey(element); - - T reduced = reducedValues.get(key); - - if (reduced == null) { - reduced = element; - } else { - reduced = reducer.reduce(serializer.copy(reduced), element); - } - - reducedValues.put(key, reduced); - numOfElements++; - } - - public void evict(int n) { - } - - public int size() { - return numOfElements; - } - - @Override - public TumblingGroupedPreReducer<T> clone() { - return new TumblingGroupedPreReducer<T>(reducer, keySelector, serializer); - } - - @Override - public String toString() { - return reducedValues.toString(); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/TumblingPreReducer.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/TumblingPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/TumblingPreReducer.java deleted file mode 100644 index f1e531e..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/TumblingPreReducer.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.invokable.operator.windowing; - -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.util.Collector; - -public class TumblingPreReducer<T> implements WindowBuffer<T>, CompletePreAggregator { - - private static final long serialVersionUID = 1L; - - private ReduceFunction<T> reducer; - - private T reduced; - private int numOfElements = 0; - private TypeSerializer<T> serializer; - - public TumblingPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer) { - this.reducer = reducer; - this.serializer = serializer; - } - - public boolean emitWindow(Collector<StreamWindow<T>> collector) { - if (reduced != null) { - StreamWindow<T> currentWindow = new StreamWindow<T>(); - currentWindow.add(reduced); - collector.collect(currentWindow); - reduced = null; - numOfElements = 0; - return true; - } else { - return false; - } - } - - public void store(T element) throws Exception { - if (reduced == null) { - reduced = element; - } else { - reduced = reducer.reduce(serializer.copy(reduced), element); - } - numOfElements++; - } - - public void evict(int n) { - } - - public int size() { - return numOfElements; - } - - @Override - public TumblingPreReducer<T> clone() { - return new TumblingPreReducer<T>(reducer, serializer); - } - - @Override - public String toString() { - return reduced.toString(); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBuffer.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBuffer.java deleted file mode 100644 index ef8fc2b..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBuffer.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.invokable.operator.windowing; - -import java.io.Serializable; - -import org.apache.flink.util.Collector; - -public interface WindowBuffer<T> extends Serializable { - - public void store(T element) throws Exception; - - public void evict(int n); - - public boolean emitWindow(Collector<StreamWindow<T>> collector); - - public int size(); - - public WindowBuffer<T> clone(); - -} http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java index 5f5e7d2..7eaea58 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.api.invokable.operator.windowing; import org.apache.flink.streaming.api.invokable.ChainableInvokable; +import org.apache.flink.streaming.api.windowing.StreamWindow; public class WindowFlattener<T> extends ChainableInvokable<StreamWindow<T>, T> { http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapper.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapper.java index 23aaf32..de93fab 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapper.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapper.java @@ -17,29 +17,29 @@ package org.apache.flink.streaming.api.invokable.operator.windowing; -import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.streaming.api.function.WindowMapFunction; import org.apache.flink.streaming.api.invokable.operator.MapInvokable; +import org.apache.flink.streaming.api.windowing.StreamWindow; public class WindowMapper<IN, OUT> extends MapInvokable<StreamWindow<IN>, StreamWindow<OUT>> { private static final long serialVersionUID = 1L; - GroupReduceFunction<IN, OUT> reducer; + WindowMapFunction<IN, OUT> mapper; - public WindowMapper(GroupReduceFunction<IN, OUT> reducer) { - super(new WindowMapfunction<IN, OUT>(reducer)); - this.reducer = reducer; + public WindowMapper(WindowMapFunction<IN, OUT> mapper) { + super(new WindowMap<IN, OUT>(mapper)); + this.mapper = mapper; } - private static class WindowMapfunction<T, R> implements - MapFunction<StreamWindow<T>, StreamWindow<R>> { + private static class WindowMap<T, R> implements MapFunction<StreamWindow<T>, StreamWindow<R>> { private static final long serialVersionUID = 1L; - GroupReduceFunction<T, R> reducer; + WindowMapFunction<T, R> mapper; - public WindowMapfunction(GroupReduceFunction<T, R> reducer) { - this.reducer = reducer; + public WindowMap(WindowMapFunction<T, R> mapper) { + this.mapper = mapper; } @Override @@ -47,7 +47,7 @@ public class WindowMapper<IN, OUT> extends MapInvokable<StreamWindow<IN>, Stream StreamWindow<R> outputWindow = new StreamWindow<R>(window.windowID); outputWindow.numberOfParts = window.numberOfParts; - reducer.reduce(window, outputWindow); + mapper.mapWindow(window, outputWindow); return outputWindow; } http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java index 1766b0b..8601d06 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java @@ -21,6 +21,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.flink.streaming.api.invokable.ChainableInvokable; +import org.apache.flink.streaming.api.windowing.StreamWindow; public class WindowMerger<T> extends ChainableInvokable<StreamWindow<T>, StreamWindow<T>> { http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java index e10692b..ea4451e 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.invokable.operator.windowing; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.invokable.ChainableInvokable; +import org.apache.flink.streaming.api.windowing.StreamWindow; public class WindowPartitioner<T> extends ChainableInvokable<StreamWindow<T>, StreamWindow<T>> { http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowReducer.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowReducer.java index b4f965f..3a4bb69 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowReducer.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowReducer.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.invokable.operator.windowing; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.streaming.api.invokable.operator.MapInvokable; +import org.apache.flink.streaming.api.windowing.StreamWindow; public class WindowReducer<IN> extends MapInvokable<StreamWindow<IN>, StreamWindow<IN>> { http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java new file mode 100644 index 0000000..988058c --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.windowing; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.util.Collector; + +public class StreamWindow<T> extends ArrayList<T> implements Collector<T> { + + private static final long serialVersionUID = -5150196421193988403L; + private static Random rnd = new Random(); + + public int windowID; + + public int numberOfParts; + + public StreamWindow() { + this(rnd.nextInt(), 1); + } + + public StreamWindow(int windowID) { + this(windowID, 1); + } + + public StreamWindow(int windowID, int numberOfParts) { + super(); + this.windowID = windowID; + this.numberOfParts = numberOfParts; + } + + public StreamWindow(StreamWindow<T> window) { + this(window.windowID, window.numberOfParts); + addAll(window); + } + + public StreamWindow(StreamWindow<T> window, TypeSerializer<T> serializer) { + this(window.windowID, window.numberOfParts); + for (T element : window) { + add(serializer.copy(element)); + } + } + + public List<StreamWindow<T>> partitionBy(KeySelector<T, ?> keySelector) throws Exception { + Map<Object, StreamWindow<T>> partitions = new HashMap<Object, StreamWindow<T>>(); + + for (T value : this) { + Object key = keySelector.getKey(value); + StreamWindow<T> window = partitions.get(key); + if (window == null) { + window = new StreamWindow<T>(this.windowID, 0); + partitions.put(key, window); + } + window.add(value); + } + + List<StreamWindow<T>> output = new ArrayList<StreamWindow<T>>(); + int numkeys = partitions.size(); + + for (StreamWindow<T> window : partitions.values()) { + output.add(window.setNumberOfParts(numkeys)); + } + + return output; + } + + public List<StreamWindow<T>> split(int n) { + int numElements = size(); + if (n > numElements) { + return split(numElements); + } else { + List<StreamWindow<T>> split = new ArrayList<StreamWindow<T>>(); + int splitSize = numElements / n; + + int index = -1; + + StreamWindow<T> currentSubWindow = new StreamWindow<T>(windowID, n); + split.add(currentSubWindow); + + for (T element : this) { + index++; + if (index == splitSize && split.size() < n) { + currentSubWindow = new StreamWindow<T>(windowID, n); + split.add(currentSubWindow); + index = 0; + } + currentSubWindow.add(element); + } + return split; + } + } + + public StreamWindow<T> setNumberOfParts(int n) { + this.numberOfParts = n; + return this; + } + + public boolean compatibleWith(StreamWindow<T> otherWindow) { + return this.windowID == otherWindow.windowID && this.numberOfParts > 1; + } + + public static <R> StreamWindow<R> merge(StreamWindow<R>... windows) { + StreamWindow<R> window = new StreamWindow<R>(windows[0]); + for (int i = 1; i < windows.length; i++) { + StreamWindow<R> next = windows[i]; + if (window.compatibleWith(next)) { + window.addAll(next); + window.numberOfParts--; + } else { + throw new RuntimeException("Can only merge compatible windows"); + } + } + return window; + } + + public static <R> StreamWindow<R> merge(List<StreamWindow<R>> windows) { + if (windows.isEmpty()) { + throw new RuntimeException("Need at least one window to merge"); + } else { + StreamWindow<R> window = new StreamWindow<R>(windows.get(0)); + for (int i = 1; i < windows.size(); i++) { + StreamWindow<R> next = windows.get(i); + if (window.compatibleWith(next)) { + window.addAll(next); + window.numberOfParts--; + } else { + throw new RuntimeException("Can only merge compatible windows"); + } + } + return window; + } + } + + @Override + public boolean equals(Object o) { + return super.equals(o); + } + + @Override + public void collect(T record) { + add(record); + } + + @Override + public void close() { + } + + @Override + public String toString() { + return super.toString(); + } + + public static <R> StreamWindow<R> fromElements(R... elements) { + StreamWindow<R> window = new StreamWindow<R>(); + for (R element : elements) { + window.add(element); + } + return window; + } +}