[FLINK-3200] Use Partitioned State in WindowOperator This changes window operator to use the new partitioned state abstraction for keeping window contents instead of custom internal state and the checkpointed interface.
For now, timers are still kept as custom checkpointed state, however. WindowOperator now expects a StateIdentifier for MergingState, this can either be for ReducingState or ListState but WindowOperator is agnostic to the type of State. Also the signature of WindowFunction is changed to include the type of intermediate input. For example, if a ReducingState is used the input of the WindowFunction is T (where T is the input type). If using a ListState the input of the WindowFunction would be of type Iterable[T]. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/67ca4a43 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/67ca4a43 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/67ca4a43 Branch: refs/heads/master Commit: 67ca4a436daf8de1a6e0329b4b30342e77d26087 Parents: caf4672 Author: Aljoscha Krettek <[email protected]> Authored: Mon Jan 25 12:34:05 2016 +0100 Committer: Stephan Ewen <[email protected]> Committed: Wed Feb 3 20:27:51 2016 +0100 ---------------------------------------------------------------------- .../ml/IncrementalLearningSkeleton.java | 2 +- .../GroupedProcessingTimeWindowExample.java | 2 +- .../examples/windowing/SessionWindowing.java | 11 +- .../examples/windowing/TopSpeedWindowing.java | 2 +- .../examples/windowing/TopSpeedWindowing.scala | 2 +- .../api/datastream/AllWindowedStream.java | 37 +- .../api/datastream/CoGroupedStreams.java | 2 +- .../api/datastream/WindowedStream.java | 118 ++-- .../aggregation/AggregationFunction.java | 4 +- .../functions/windowing/AllWindowFunction.java | 4 +- .../windowing/FoldAllWindowFunction.java | 2 +- .../functions/windowing/FoldWindowFunction.java | 2 +- .../windowing/ReduceAllWindowFunction.java | 44 +- .../windowing/ReduceApplyAllWindowFunction.java | 54 ++ .../windowing/ReduceApplyWindowFunction.java | 54 ++ .../ReduceIterableAllWindowFunction.java | 46 ++ .../windowing/ReduceIterableWindowFunction.java | 46 ++ .../windowing/ReduceWindowFunction.java | 26 +- .../ReduceWindowFunctionWithWindow.java | 44 +- .../api/functions/windowing/WindowFunction.java | 4 +- .../triggers/ContinuousEventTimeTrigger.java | 8 +- .../ContinuousProcessingTimeTrigger.java | 11 +- .../api/windowing/triggers/CountTrigger.java | 9 +- .../api/windowing/triggers/DeltaTrigger.java | 18 +- .../api/windowing/triggers/Trigger.java | 57 +- .../windowing/AccumulatingKeyedTimePanes.java | 8 +- ...ccumulatingProcessingTimeWindowOperator.java | 6 +- .../EvictingNonKeyedWindowOperator.java | 2 +- .../windowing/EvictingWindowOperator.java | 119 +++- .../windowing/NonKeyedWindowOperator.java | 73 ++- .../operators/windowing/WindowOperator.java | 570 ++++++++----------- .../flink/streaming/api/DataStreamTest.java | 2 +- .../api/complex/ComplexIntegrationTest.java | 59 +- ...AlignedProcessingTimeWindowOperatorTest.java | 10 +- ...AlignedProcessingTimeWindowOperatorTest.java | 22 +- .../windowing/AllWindowTranslationTest.java | 10 +- .../EvictingNonKeyedWindowOperatorTest.java | 32 +- .../windowing/EvictingWindowOperatorTest.java | 131 ++++- .../windowing/NonKeyedWindowOperatorTest.java | 46 +- .../windowing/TimeWindowTranslationTest.java | 8 +- .../operators/windowing/WindowOperatorTest.java | 275 ++++++--- .../windowing/WindowTranslationTest.java | 50 +- .../streaming/api/scala/AllWindowedStream.scala | 19 +- .../streaming/api/scala/WindowedStream.scala | 19 +- .../api/scala/AllWindowTranslationTest.scala | 41 +- .../api/scala/WindowTranslationTest.scala | 46 +- .../EventTimeAllWindowCheckpointingITCase.java | 54 +- .../EventTimeWindowCheckpointingITCase.java | 116 ++-- .../WindowCheckpointingITCase.java | 35 +- 49 files changed, 1399 insertions(+), 963 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java index 32cf430..8f502dd 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java @@ -172,7 +172,7 @@ public class IncrementalLearningSkeleton { /** * Builds up-to-date partial models on new training data. */ - public static class PartialModelBuilder implements AllWindowFunction<Integer, Double[], TimeWindow> { + public static class PartialModelBuilder implements AllWindowFunction<Iterable<Integer>, Double[], TimeWindow> { private static final long serialVersionUID = 1L; protected Double[] buildPartialModel(Iterable<Integer> values) { http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java index f08069b..196b73e 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java @@ -104,7 +104,7 @@ public class GroupedProcessingTimeWindowExample { } } - public static class SummingWindowFunction implements WindowFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Long, Window> { + public static class SummingWindowFunction implements WindowFunction<Iterable<Tuple2<Long, Long>>, Tuple2<Long, Long>, Long, Window> { @Override public void apply(Long key, Window window, Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) { http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java index b86830d..69f61bc 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java @@ -17,7 +17,10 @@ package org.apache.flink.streaming.examples.windowing; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; @@ -100,6 +103,10 @@ public class SessionWindowing { private final Long sessionTimeout; + private final ValueStateDescriptor<Long> stateDesc = new ValueStateDescriptor<>("last-seen", 1L, + BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new ExecutionConfig())); + + public SessionTrigger(Long sessionTimeout) { this.sessionTimeout = sessionTimeout; @@ -108,7 +115,7 @@ public class SessionWindowing { @Override public TriggerResult onElement(Tuple3<String, Long, Integer> element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception { - ValueState<Long> lastSeenState = ctx.getKeyValueState("last-seen", 1L); + ValueState<Long> lastSeenState = ctx.getPartitionedState(stateDesc); Long lastSeen = lastSeenState.value(); Long timeSinceLastEvent = timestamp - lastSeen; @@ -127,7 +134,7 @@ public class SessionWindowing { @Override public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception { - ValueState<Long> lastSeenState = ctx.getKeyValueState("last-seen", 1L); + ValueState<Long> lastSeenState = ctx.getPartitionedState(stateDesc); Long lastSeen = lastSeenState.value(); if (time - lastSeen >= sessionTimeout) { http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java index 30eda67..5a56a40 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java @@ -85,7 +85,7 @@ public class TopSpeedWindowing { Tuple4<Integer, Integer, Double, Long> newDataPoint) { return newDataPoint.f2 - oldDataPoint.f2; } - })) + }, carData.getType().createSerializer(env.getConfig()))) .maxBy(1); if (fileOutput) { http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala index f26f32c..c30e654 100644 --- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala +++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala @@ -72,7 +72,7 @@ object TopSpeedWindowing { .evictor(TimeEvictor.of(Time.of(evictionSec * 1000, TimeUnit.MILLISECONDS))) .trigger(DeltaTrigger.of(triggerMeters, new DeltaFunction[CarEvent] { def getDelta(oldSp: CarEvent, newSp: CarEvent): Double = newSp.distance - oldSp.distance - })) + }, cars.getType().createSerializer(env.getConfig))) // .window(Time.of(evictionSec * 1000, (car : CarEvent) => car.time)) // .every(Delta.of[CarEvent](triggerMeters, // (oldSp,newSp) => newSp.distance-oldSp.distance, CarEvent(0,0,0,0))) http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java index 989e762..8cef5ea 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java @@ -21,8 +21,10 @@ package org.apache.flink.streaming.api.datastream; import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.Utils; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -30,8 +32,9 @@ import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction; import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator; import org.apache.flink.streaming.api.functions.aggregation.SumAggregator; import org.apache.flink.streaming.api.functions.windowing.FoldAllWindowFunction; -import org.apache.flink.streaming.api.functions.windowing.ReduceAllWindowFunction; import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.ReduceApplyAllWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.ReduceIterableAllWindowFunction; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; import org.apache.flink.streaming.api.windowing.evictors.Evictor; @@ -126,6 +129,11 @@ public class AllWindowedStream<T, W extends Window> { * @return The data stream that is the result of applying the reduce function to the window. */ public SingleOutputStreamOperator<T, ?> reduce(ReduceFunction<T> function) { + if (function instanceof RichFunction) { + throw new UnsupportedOperationException("ReduceFunction of reduce can not be a RichFunction. " + + "Please use apply(ReduceFunction, WindowFunction) instead."); + } + //clean the closure function = input.getExecutionEnvironment().clean(function); @@ -147,7 +155,7 @@ public class AllWindowedStream<T, W extends Window> { operator = new EvictingNonKeyedWindowOperator<>(windowAssigner, windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), new HeapWindowBuffer.Factory<T>(), - new ReduceAllWindowFunction<W, T>(function), + new ReduceIterableAllWindowFunction<W, T>(function), trigger, evictor).enableSetProcessingTime(setProcessingTime); @@ -155,7 +163,7 @@ public class AllWindowedStream<T, W extends Window> { operator = new NonKeyedWindowOperator<>(windowAssigner, windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), new PreAggregatingHeapWindowBuffer.Factory<>(function), - new ReduceAllWindowFunction<W, T>(function), + new ReduceIterableAllWindowFunction<W, T>(function), trigger).enableSetProcessingTime(setProcessingTime); } @@ -205,10 +213,11 @@ public class AllWindowedStream<T, W extends Window> { * @param function The window function. * @return The data stream that is the result of applying the window function to the window. */ - public <R> SingleOutputStreamOperator<R, ?> apply(AllWindowFunction<T, R, W> function) { - TypeInformation<T> inType = input.getType(); + public <R> SingleOutputStreamOperator<R, ?> apply(AllWindowFunction<Iterable<T>, R, W> function) { + @SuppressWarnings("unchecked, rawtypes") + TypeInformation<Iterable<T>> iterTypeInfo = new GenericTypeInfo<>((Class) Iterable.class); TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( - function, AllWindowFunction.class, true, true, inType, null, false); + function, AllWindowFunction.class, true, true, iterTypeInfo, null, false); return apply(function, resultType); } @@ -224,7 +233,7 @@ public class AllWindowedStream<T, W extends Window> { * @param function The window function. * @return The data stream that is the result of applying the window function to the window. */ - public <R> SingleOutputStreamOperator<R, ?> apply(AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) { + public <R> SingleOutputStreamOperator<R, ?> apply(AllWindowFunction<Iterable<T>, R, W> function, TypeInformation<R> resultType) { //clean the closure function = input.getExecutionEnvironment().clean(function); @@ -297,6 +306,10 @@ public class AllWindowedStream<T, W extends Window> { * @return The data stream that is the result of applying the window function to the window. */ public <R> SingleOutputStreamOperator<R, ?> apply(ReduceFunction<T> preAggregator, AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) { + if (preAggregator instanceof RichFunction) { + throw new UnsupportedOperationException("Pre-aggregator of apply can not be a RichFunction."); + } + //clean the closures function = input.getExecutionEnvironment().clean(function); preAggregator = input.getExecutionEnvironment().clean(preAggregator); @@ -314,16 +327,16 @@ public class AllWindowedStream<T, W extends Window> { operator = new EvictingNonKeyedWindowOperator<>(windowAssigner, windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), new HeapWindowBuffer.Factory<T>(), - function, + new ReduceApplyAllWindowFunction<>(preAggregator, function), trigger, evictor).enableSetProcessingTime(setProcessingTime); } else { operator = new NonKeyedWindowOperator<>(windowAssigner, - windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), - new PreAggregatingHeapWindowBuffer.Factory<>(preAggregator), - function, - trigger).enableSetProcessingTime(setProcessingTime); + windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), + new PreAggregatingHeapWindowBuffer.Factory<>(preAggregator), + new ReduceApplyAllWindowFunction<>(preAggregator, function), + trigger).enableSetProcessingTime(setProcessingTime); } return input.transform(opName, resultType, operator).setParallelism(1); http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java index d1da783..3903015 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java @@ -545,7 +545,7 @@ public class CoGroupedStreams<T1, T2> { private static class CoGroupWindowFunction<T1, T2, T, KEY, W extends Window> extends WrappingFunction<CoGroupFunction<T1, T2, T>> - implements WindowFunction<TaggedUnion<T1, T2>, T, KEY, W> { + implements WindowFunction<Iterable<TaggedUnion<T1, T2>>, T, KEY, W> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java index 9dbee30..d64248f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java @@ -21,9 +21,13 @@ package org.apache.flink.streaming.api.datastream; import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -31,8 +35,10 @@ import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction; import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator; import org.apache.flink.streaming.api.functions.aggregation.SumAggregator; import org.apache.flink.streaming.api.functions.windowing.FoldWindowFunction; -import org.apache.flink.streaming.api.functions.windowing.WindowFunction; +import org.apache.flink.streaming.api.functions.windowing.ReduceApplyWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.ReduceIterableWindowFunction; import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows; @@ -46,8 +52,8 @@ import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProces import org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator; import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator; import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator; -import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer; -import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; /** * A {@code WindowedStream} represents a data stream where elements are grouped by @@ -136,7 +142,13 @@ public class WindowedStream<T, K, W extends Window> { * @param function The reduce function. * @return The data stream that is the result of applying the reduce function to the window. */ + @SuppressWarnings("unchecked") public SingleOutputStreamOperator<T, ?> reduce(ReduceFunction<T> function) { + if (function instanceof RichFunction) { + throw new UnsupportedOperationException("ReduceFunction of reduce can not be a RichFunction. " + + "Please use apply(ReduceFunction, WindowFunction) instead."); + } + //clean the closure function = input.getExecutionEnvironment().clean(function); @@ -156,23 +168,30 @@ public class WindowedStream<T, K, W extends Window> { boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime; if (evictor != null) { + ListStateDescriptor<StreamRecord<T>> stateDesc = new ListStateDescriptor<>("window-contents", + new StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig()))); + operator = new EvictingWindowOperator<>(windowAssigner, - windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), - keySel, - input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), - new HeapWindowBuffer.Factory<T>(), - new ReduceWindowFunction<K, W, T>(function), - trigger, - evictor).enableSetProcessingTime(setProcessingTime); + windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), + keySel, + input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), + stateDesc, + new ReduceIterableWindowFunction<K, W, T>(function), + trigger, + evictor).enableSetProcessingTime(setProcessingTime); } else { + ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents", + function, + input.getType().createSerializer(getExecutionEnvironment().getConfig())); + operator = new WindowOperator<>(windowAssigner, - windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), - keySel, - input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), - new PreAggregatingHeapWindowBuffer.Factory<>(function), - new ReduceWindowFunction<K, W, T>(function), - trigger).enableSetProcessingTime(setProcessingTime); + windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), + keySel, + input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), + stateDesc, + new ReduceWindowFunction<K, W, T>(), + trigger).enableSetProcessingTime(setProcessingTime); } return input.transform(opName, input.getType(), operator); @@ -222,10 +241,11 @@ public class WindowedStream<T, K, W extends Window> { * @param function The window function. * @return The data stream that is the result of applying the window function to the window. */ - public <R> SingleOutputStreamOperator<R, ?> apply(WindowFunction<T, R, K, W> function) { - TypeInformation<T> inType = input.getType(); + public <R> SingleOutputStreamOperator<R, ?> apply(WindowFunction<Iterable<T>, R, K, W> function) { + @SuppressWarnings("unchecked, rawtypes") + TypeInformation<Iterable<T>> iterTypeInfo = new GenericTypeInfo<>((Class) Iterable.class); TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( - function, WindowFunction.class, true, true, inType, null, false); + function, WindowFunction.class, true, true, iterTypeInfo, null, false); return apply(function, resultType); } @@ -243,7 +263,8 @@ public class WindowedStream<T, K, W extends Window> { * @param resultType Type information for the result type of the window function * @return The data stream that is the result of applying the window function to the window. */ - public <R> SingleOutputStreamOperator<R, ?> apply(WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) { + public <R> SingleOutputStreamOperator<R, ?> apply(WindowFunction<Iterable<T>, R, K, W> function, TypeInformation<R> resultType) { + //clean the closure function = input.getExecutionEnvironment().clean(function); @@ -259,26 +280,33 @@ public class WindowedStream<T, K, W extends Window> { String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")"; KeySelector<T, K> keySel = input.getKeySelector(); - WindowOperator<K, T, R, W> operator; + WindowOperator<K, T, Iterable<T>, R, W> operator; boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime; + if (evictor != null) { + ListStateDescriptor<StreamRecord<T>> stateDesc = new ListStateDescriptor<>("window-contents", + new StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig()))); + operator = new EvictingWindowOperator<>(windowAssigner, windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), keySel, input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), - new HeapWindowBuffer.Factory<T>(), + stateDesc, function, trigger, evictor).enableSetProcessingTime(setProcessingTime); } else { + ListStateDescriptor<T> stateDesc = new ListStateDescriptor<>("window-contents", + input.getType().createSerializer(getExecutionEnvironment().getConfig())); + operator = new WindowOperator<>(windowAssigner, windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), keySel, input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), - new HeapWindowBuffer.Factory<T>(), + stateDesc, function, trigger).enableSetProcessingTime(setProcessingTime); } @@ -294,17 +322,17 @@ public class WindowedStream<T, K, W extends Window> { * <p> * Arriving data is pre-aggregated using the given pre-aggregation reducer. * - * @param preAggregator The reduce function that is used for pre-aggregation + * @param reduceFunction The reduce function that is used for pre-aggregation * @param function The window function. * @return The data stream that is the result of applying the window function to the window. */ - public <R> SingleOutputStreamOperator<R, ?> apply(ReduceFunction<T> preAggregator, WindowFunction<T, R, K, W> function) { + public <R> SingleOutputStreamOperator<R, ?> apply(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function) { TypeInformation<T> inType = input.getType(); TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( function, WindowFunction.class, true, true, inType, null, false); - return apply(preAggregator, function, resultType); + return apply(reduceFunction, function, resultType); } /** @@ -315,15 +343,19 @@ public class WindowedStream<T, K, W extends Window> { * <p> * Arriving data is pre-aggregated using the given pre-aggregation reducer. * - * @param preAggregator The reduce function that is used for pre-aggregation + * @param reduceFunction The reduce function that is used for pre-aggregation * @param function The window function. * @param resultType Type information for the result type of the window function * @return The data stream that is the result of applying the window function to the window. */ - public <R> SingleOutputStreamOperator<R, ?> apply(ReduceFunction<T> preAggregator, WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) { + public <R> SingleOutputStreamOperator<R, ?> apply(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) { + if (reduceFunction instanceof RichFunction) { + throw new UnsupportedOperationException("Pre-aggregator of apply can not be a RichFunction."); + } + //clean the closures function = input.getExecutionEnvironment().clean(function); - preAggregator = input.getExecutionEnvironment().clean(preAggregator); + reduceFunction = input.getExecutionEnvironment().clean(reduceFunction); String callLocation = Utils.getCallLocationName(); String udfName = "WindowApply at " + callLocation; @@ -336,21 +368,29 @@ public class WindowedStream<T, K, W extends Window> { boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime; if (evictor != null) { + + ListStateDescriptor<StreamRecord<T>> stateDesc = new ListStateDescriptor<>("window-contents", + new StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig()))); + operator = new EvictingWindowOperator<>(windowAssigner, - windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), - keySel, - input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), - new HeapWindowBuffer.Factory<T>(), - function, - trigger, - evictor).enableSetProcessingTime(setProcessingTime); + windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), + keySel, + input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), + stateDesc, + new ReduceApplyWindowFunction<>(reduceFunction, function), + trigger, + evictor).enableSetProcessingTime(setProcessingTime); } else { + ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents", + reduceFunction, + input.getType().createSerializer(getExecutionEnvironment().getConfig())); + operator = new WindowOperator<>(windowAssigner, windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), keySel, input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), - new PreAggregatingHeapWindowBuffer.Factory<>(preAggregator), + stateDesc, function, trigger).enableSetProcessingTime(setProcessingTime); } @@ -587,7 +627,7 @@ public class WindowedStream<T, K, W extends Window> { } else if (function instanceof WindowFunction) { @SuppressWarnings("unchecked") - WindowFunction<T, R, K, TimeWindow> wf = (WindowFunction<T, R, K, TimeWindow>) function; + WindowFunction<Iterable<T>, R, K, TimeWindow> wf = (WindowFunction<Iterable<T>, R, K, TimeWindow>) function; OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>( wf, input.getKeySelector(), @@ -619,7 +659,7 @@ public class WindowedStream<T, K, W extends Window> { } else if (function instanceof WindowFunction) { @SuppressWarnings("unchecked") - WindowFunction<T, R, K, TimeWindow> wf = (WindowFunction<T, R, K, TimeWindow>) function; + WindowFunction<Iterable<T>, R, K, TimeWindow> wf = (WindowFunction<Iterable<T>, R, K, TimeWindow>) function; OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>( wf, input.getKeySelector(), http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java index ed39103..fe711a5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java @@ -17,9 +17,9 @@ package org.apache.flink.streaming.api.functions.aggregation; -import org.apache.flink.api.common.functions.RichReduceFunction; +import org.apache.flink.api.common.functions.ReduceFunction; -public abstract class AggregationFunction<T> extends RichReduceFunction<T> { +public abstract class AggregationFunction<T> implements ReduceFunction<T> { private static final long serialVersionUID = 1L; public enum AggregationType { http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java index 1d54436..b66bac6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java @@ -30,7 +30,7 @@ import java.io.Serializable; * @param <IN> The type of the input value. * @param <OUT> The type of the output value. */ -public interface AllWindowFunction<IN, OUT, W extends Window> extends Function, Serializable { +public interface AllWindowFunction<IN, OUT, W extends Window> extends Function, Serializable { /** * Evaluates the window and outputs none or several elements. @@ -41,5 +41,5 @@ public interface AllWindowFunction<IN, OUT, W extends Window> extends Function, * * @throws Exception The function may throw exceptions to fail the program and trigger recovery. */ - void apply(W window, Iterable<IN> values, Collector<OUT> out) throws Exception; + void apply(W window, IN values, Collector<OUT> out) throws Exception; } http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldAllWindowFunction.java index af32f9b..46f9b3c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldAllWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldAllWindowFunction.java @@ -36,7 +36,7 @@ import java.io.IOException; public class FoldAllWindowFunction<W extends Window, T, R> extends WrappingFunction<FoldFunction<T, R>> - implements AllWindowFunction<T, R, W>, OutputTypeConfigurable<R> { + implements AllWindowFunction<Iterable<T>, R, W>, OutputTypeConfigurable<R> { private static final long serialVersionUID = 1L; private byte[] serializedInitialValue; http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java index b1eb3cd..db6d1bb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java @@ -36,7 +36,7 @@ import java.io.ObjectInputStream; public class FoldWindowFunction<K, W extends Window, T, R> extends WrappingFunction<FoldFunction<T, R>> - implements WindowFunction<T, R, K, W>, OutputTypeConfigurable<R> { + implements WindowFunction<Iterable<T>, R, K, W>, OutputTypeConfigurable<R> { private static final long serialVersionUID = 1L; private byte[] serializedInitialValue; http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceAllWindowFunction.java index 24855a5..76b095b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceAllWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceAllWindowFunction.java @@ -17,54 +17,14 @@ */ package org.apache.flink.streaming.api.functions.windowing; -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.api.common.functions.util.FunctionUtils; -import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; public class ReduceAllWindowFunction<W extends Window, T> extends RichAllWindowFunction<T, T, W> { private static final long serialVersionUID = 1L; - private final ReduceFunction<T> reduceFunction; - - public ReduceAllWindowFunction(ReduceFunction<T> reduceFunction) { - this.reduceFunction = reduceFunction; - } - - @Override - public void setRuntimeContext(RuntimeContext ctx) { - super.setRuntimeContext(ctx); - FunctionUtils.setFunctionRuntimeContext(reduceFunction, ctx); - } - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - FunctionUtils.openFunction(reduceFunction, parameters); - } - @Override - public void close() throws Exception { - super.close(); - FunctionUtils.closeFunction(reduceFunction); - } - - @Override - public void apply(W window, Iterable<T> values, Collector<T> out) throws Exception { - T result = null; - - for (T v: values) { - if (result == null) { - result = v; - } else { - result = reduceFunction.reduce(result, v); - } - } - - if (result != null) { - out.collect(result); - } + public void apply(W window, T input, Collector<T> out) throws Exception { + out.collect(input); } } http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyAllWindowFunction.java new file mode 100644 index 0000000..f9fb771 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyAllWindowFunction.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.windowing; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.util.Collector; + +public class ReduceApplyAllWindowFunction<W extends Window, T, R> + extends WrappingFunction<AllWindowFunction<T, R, W>> + implements AllWindowFunction<Iterable<T>, R, W> { + + private static final long serialVersionUID = 1L; + + private final ReduceFunction<T> reduceFunction; + private final AllWindowFunction<T, R, W> windowFunction; + + public ReduceApplyAllWindowFunction(ReduceFunction<T> reduceFunction, + AllWindowFunction<T, R, W> windowFunction) { + super(windowFunction); + this.reduceFunction = reduceFunction; + this.windowFunction = windowFunction; + } + + @Override + public void apply(W window, Iterable<T> input, Collector<R> out) throws Exception { + + T curr = null; + for (T val: input) { + if (curr == null) { + curr = val; + } else { + curr = reduceFunction.reduce(curr, val); + } + } + windowFunction.apply(window, curr, out); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyWindowFunction.java new file mode 100644 index 0000000..bf52e9b --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyWindowFunction.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.windowing; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.util.Collector; + +public class ReduceApplyWindowFunction<K, W extends Window, T, R> + extends WrappingFunction<WindowFunction<T, R, K, W>> + implements WindowFunction<Iterable<T>, R, K, W> { + + private static final long serialVersionUID = 1L; + + private final ReduceFunction<T> reduceFunction; + private final WindowFunction<T, R, K, W> windowFunction; + + public ReduceApplyWindowFunction(ReduceFunction<T> reduceFunction, + WindowFunction<T, R, K, W> windowFunction) { + super(windowFunction); + this.reduceFunction = reduceFunction; + this.windowFunction = windowFunction; + } + + @Override + public void apply(K k, W window, Iterable<T> input, Collector<R> out) throws Exception { + + T curr = null; + for (T val: input) { + if (curr == null) { + curr = val; + } else { + curr = reduceFunction.reduce(curr, val); + } + } + windowFunction.apply(k, window, curr, out); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableAllWindowFunction.java new file mode 100644 index 0000000..2283fe7 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableAllWindowFunction.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.windowing; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.util.Collector; + +public class ReduceIterableAllWindowFunction<W extends Window, T> implements AllWindowFunction<Iterable<T>, T, W> { + private static final long serialVersionUID = 1L; + + private final ReduceFunction<T> reduceFunction; + + public ReduceIterableAllWindowFunction(ReduceFunction<T> reduceFunction) { + this.reduceFunction = reduceFunction; + } + + @Override + public void apply(W window, Iterable<T> input, Collector<T> out) throws Exception { + + T curr = null; + for (T val: input) { + if (curr == null) { + curr = val; + } else { + curr = reduceFunction.reduce(curr, val); + } + } + out.collect(curr); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableWindowFunction.java new file mode 100644 index 0000000..063cee4 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableWindowFunction.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.windowing; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.util.Collector; + +public class ReduceIterableWindowFunction<K, W extends Window, T> implements WindowFunction<Iterable<T>, T, K, W> { + private static final long serialVersionUID = 1L; + + private final ReduceFunction<T> reduceFunction; + + public ReduceIterableWindowFunction(ReduceFunction<T> reduceFunction) { + this.reduceFunction = reduceFunction; + } + + @Override + public void apply(K k, W window, Iterable<T> input, Collector<T> out) throws Exception { + + T curr = null; + for (T val: input) { + if (curr == null) { + curr = val; + } else { + curr = reduceFunction.reduce(curr, val); + } + } + out.collect(curr); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java index edd8a34..8be4553b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java @@ -17,34 +17,14 @@ */ package org.apache.flink.streaming.api.functions.windowing; -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.java.operators.translation.WrappingFunction; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; -public class ReduceWindowFunction<K, W extends Window, T> - extends WrappingFunction<ReduceFunction<T>> - implements WindowFunction<T, T, K, W> { +public class ReduceWindowFunction<K, W extends Window, T> implements WindowFunction<T, T, K, W> { private static final long serialVersionUID = 1L; - public ReduceWindowFunction(ReduceFunction<T> reduceFunction) { - super(reduceFunction); - } - @Override - public void apply(K k, W window, Iterable<T> values, Collector<T> out) throws Exception { - T result = null; - - for (T v: values) { - if (result == null) { - result = v; - } else { - result = wrappedFunction.reduce(result, v); - } - } - - if (result != null) { - out.collect(result); - } + public void apply(K k, W window, T input, Collector<T> out) throws Exception { + out.collect(input); } } http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java index 6a472b1..fe42cd3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java @@ -17,55 +17,15 @@ */ package org.apache.flink.streaming.api.functions.windowing; -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; public class ReduceWindowFunctionWithWindow<K, W extends Window, T> extends RichWindowFunction<T, Tuple2<W, T>, K, W> { private static final long serialVersionUID = 1L; - private final ReduceFunction<T> reduceFunction; - - public ReduceWindowFunctionWithWindow(ReduceFunction<T> reduceFunction) { - this.reduceFunction = reduceFunction; - } - - @Override - public void setRuntimeContext(RuntimeContext ctx) { - super.setRuntimeContext(ctx); - FunctionUtils.setFunctionRuntimeContext(reduceFunction, ctx); - } - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - FunctionUtils.openFunction(reduceFunction, parameters); - } - @Override - public void close() throws Exception { - super.close(); - FunctionUtils.closeFunction(reduceFunction); - } - - @Override - public void apply(K k, W window, Iterable<T> values, Collector<Tuple2<W, T>> out) throws Exception { - T result = null; - - for (T v: values) { - if (result == null) { - result = v; - } else { - result = reduceFunction.reduce(result, v); - } - } - - if (result != null) { - out.collect(Tuple2.of(window, result)); - } + public void apply(K k, W window, T input, Collector<Tuple2<W, T>> out) throws Exception { + out.collect(Tuple2.of(window, input)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java index eda12c0..204d6a5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java @@ -38,10 +38,10 @@ public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function * * @param key The key for which this window is evaluated. * @param window The window that is being evaluated. - * @param values The elements in the window being evaluated. + * @param input The elements in the window being evaluated. * @param out A collector for emitting elements. * * @throws Exception The function may throw exceptions to fail the program and trigger recovery. */ - void apply(KEY key, W window, Iterable<IN> values, Collector<OUT> out) throws Exception; + void apply(KEY key, W window, IN input, Collector<OUT> out) throws Exception; } http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java index b653be3..17818af 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java @@ -18,7 +18,10 @@ package org.apache.flink.streaming.api.windowing.triggers; import com.google.common.annotations.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.Window; @@ -35,6 +38,9 @@ public class ContinuousEventTimeTrigger<W extends Window> implements Trigger<Obj private final long interval; + private final ValueStateDescriptor<Boolean> stateDesc = new ValueStateDescriptor<>("first", true, + BasicTypeInfo.BOOLEAN_TYPE_INFO.createSerializer(new ExecutionConfig())); + private ContinuousEventTimeTrigger(long interval) { this.interval = interval; } @@ -42,7 +48,7 @@ public class ContinuousEventTimeTrigger<W extends Window> implements Trigger<Obj @Override public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception { - ValueState<Boolean> first = ctx.getKeyValueState("first", true); + ValueState<Boolean> first = ctx.getPartitionedState(stateDesc); if (first.value()) { long start = timestamp - (timestamp % interval); http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java index 7f3e7ec..20a2274 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java @@ -18,7 +18,10 @@ package org.apache.flink.streaming.api.windowing.triggers; import com.google.common.annotations.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.Window; @@ -33,6 +36,10 @@ public class ContinuousProcessingTimeTrigger<W extends Window> implements Trigge private final long interval; + private final ValueStateDescriptor<Long> stateDesc = new ValueStateDescriptor<>("fire-timestamp", 0L, + BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new ExecutionConfig())); + + private ContinuousProcessingTimeTrigger(long interval) { this.interval = interval; } @@ -41,7 +48,7 @@ public class ContinuousProcessingTimeTrigger<W extends Window> implements Trigge public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception { long currentTime = System.currentTimeMillis(); - ValueState<Long> fireState = ctx.getKeyValueState("fire-timestamp", 0L); + ValueState<Long> fireState = ctx.getPartitionedState(stateDesc); long nextFireTimestamp = fireState.value(); if (nextFireTimestamp == 0) { @@ -70,7 +77,7 @@ public class ContinuousProcessingTimeTrigger<W extends Window> implements Trigge @Override public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception { - ValueState<Long> fireState = ctx.getKeyValueState("fire-timestamp", 0L); + ValueState<Long> fireState = ctx.getPartitionedState(stateDesc); long nextFireTimestamp = fireState.value(); // only fire if an element didn't already fire http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java index d101fe1..e8742d5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java @@ -17,7 +17,10 @@ */ package org.apache.flink.streaming.api.windowing.triggers; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.streaming.api.windowing.windows.Window; import java.io.IOException; @@ -32,13 +35,17 @@ public class CountTrigger<W extends Window> implements Trigger<Object, W> { private final long maxCount; + private final ValueStateDescriptor<Long> stateDesc = new ValueStateDescriptor<>("count", 0L, + BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new ExecutionConfig())); + + private CountTrigger(long maxCount) { this.maxCount = maxCount; } @Override public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws IOException { - ValueState<Long> count = ctx.getKeyValueState("count", 0L); + ValueState<Long> count = ctx.getPartitionedState(stateDesc); long currentCount = count.value() + 1; count.update(currentCount); if (currentCount >= maxCount) { http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java index 37c8a45..60ada88 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java @@ -18,11 +18,11 @@ package org.apache.flink.streaming.api.windowing.triggers; import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction; import org.apache.flink.streaming.api.windowing.windows.Window; -import java.io.Serializable; - /** * A {@link Trigger} that fires based on a {@link DeltaFunction} and a threshold. * @@ -33,20 +33,23 @@ import java.io.Serializable; * * @param <W> The type of {@link Window Windows} on which this trigger can operate. */ -public class DeltaTrigger<T extends Serializable, W extends Window> implements Trigger<T, W> { +public class DeltaTrigger<T, W extends Window> implements Trigger<T, W> { private static final long serialVersionUID = 1L; private final DeltaFunction<T> deltaFunction; private final double threshold; + private final ValueStateDescriptor<T> stateDesc; - private DeltaTrigger(double threshold, DeltaFunction<T> deltaFunction) { + private DeltaTrigger(double threshold, DeltaFunction<T> deltaFunction, TypeSerializer<T> stateSerializer) { this.deltaFunction = deltaFunction; this.threshold = threshold; + stateDesc = new ValueStateDescriptor<>("last-element", null, stateSerializer); + } @Override public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception { - ValueState<T> lastElementState = ctx.getKeyValueState("last-element", null); + ValueState<T> lastElementState = ctx.getPartitionedState(stateDesc); if (lastElementState.value() == null) { lastElementState.update(element); return TriggerResult.CONTINUE; @@ -78,11 +81,12 @@ public class DeltaTrigger<T extends Serializable, W extends Window> implements T * * @param threshold The threshold at which to trigger. * @param deltaFunction The delta function to use + * @param stateSerializer TypeSerializer for the data elements. * * @param <T> The type of elements on which this trigger can operate. * @param <W> The type of {@link Window Windows} on which this trigger can operate. */ - public static <T extends Serializable, W extends Window> DeltaTrigger<T, W> of(double threshold, DeltaFunction<T> deltaFunction) { - return new DeltaTrigger<>(threshold, deltaFunction); + public static <T, W extends Window> DeltaTrigger<T, W> of(double threshold, DeltaFunction<T> deltaFunction, TypeSerializer<T> stateSerializer) { + return new DeltaTrigger<>(threshold, deltaFunction, stateSerializer); } } http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java index aed393b..8ea50b3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java @@ -17,7 +17,10 @@ */ package org.apache.flink.streaming.api.windowing.triggers; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.windowing.windows.Window; import java.io.Serializable; @@ -142,20 +145,62 @@ public interface Trigger<T, W extends Window> extends Serializable { * Register an event-time callback. When the current watermark passes the specified * time {@link #onEventTime(long, Window, TriggerContext)} is called with the time specified here. * - * @see org.apache.flink.streaming.api.watermark.Watermark - * * @param time The watermark at which to invoke {@link #onEventTime(long, Window, TriggerContext)} + * @see org.apache.flink.streaming.api.watermark.Watermark */ void registerEventTimeTimer(long time); /** - * Retrieves an {@link ValueState} object that can be used to interact with + * Retrieves an {@link State} object that can be used to interact with + * fault-tolerant state that is scoped to the window and key of the current + * trigger invocation. + * + * @param stateDescriptor The StateDescriptor that contains the name and type of the + * state that is being accessed. + * @param <S> The type of the state. + * @return The partitioned state object. + * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the + * function (function is not part os a KeyedStream). + */ + <S extends State> S getPartitionedState(StateDescriptor<S> stateDescriptor); + + /** + * Retrieves a {@link ValueState} object that can be used to interact with * fault-tolerant state that is scoped to the window and key of the current * trigger invocation. * - * @param name A unique key for the state. - * @param defaultState The default value of the state. + * @param name The name of the key/value state. + * @param stateType The class of the type that is stored in the state. Used to generate + * serializers for managed memory and checkpointing. + * @param defaultState The default state value, returned when the state is accessed and + * no value has yet been set for the key. May be null. + * + * @param <S> The type of the state. + * @return The partitioned state object. + * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the + * function (function is not part os a KeyedStream). + */ + @Deprecated + <S extends Serializable> ValueState<S> getKeyValueState(String name, Class<S> stateType, S defaultState); + + + /** + * Retrieves a {@link ValueState} object that can be used to interact with + * fault-tolerant state that is scoped to the window and key of the current + * trigger invocation. + * + * @param name The name of the key/value state. + * @param stateType The type information for the type that is stored in the state. + * Used to create serializers for managed memory and checkpoints. + * @param defaultState The default state value, returned when the state is accessed and + * no value has yet been set for the key. May be null. + * + * @param <S> The type of the state. + * @return The partitioned state object. + * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the + * function (function is not part os a KeyedStream). */ - <S extends Serializable> ValueState<S> getKeyValueState(final String name, final S defaultState); + @Deprecated + <S extends Serializable> ValueState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState); } } http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java index e15de8e..30c40bb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java @@ -35,7 +35,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed private final KeyMap.LazyFactory<ArrayList<Type>> listFactory = getListFactory(); - private final WindowFunction<Type, Result, Key, Window> function; + private final WindowFunction<Iterable<Type>, Result, Key, Window> function; /** * IMPORTANT: This value needs to start at one, so it is fresher than the value that new entries have (zero) */ @@ -43,7 +43,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed // ------------------------------------------------------------------------ - public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, WindowFunction<Type, Result, Key, Window> function) { + public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, WindowFunction<Iterable<Type>, Result, Key, Window> function) { this.keySelector = keySelector; this.function = function; } @@ -85,7 +85,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed static final class WindowFunctionTraversal<Key, Type, Result> implements KeyMap.TraversalEvaluator<Key, ArrayList<Type>> { - private final WindowFunction<Type, Result, Key, Window> function; + private final WindowFunction<Iterable<Type>, Result, Key, Window> function; private final UnionIterator<Type> unionIterator; @@ -98,7 +98,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed private Key currentKey; - WindowFunctionTraversal(WindowFunction<Type, Result, Key, Window> function, TimeWindow window, + WindowFunctionTraversal(WindowFunction<Iterable<Type>, Result, Key, Window> function, TimeWindow window, Collector<Result> out, AbstractStreamOperator<Result> contextOperator) { this.function = function; this.out = out; http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java index 7a7d04c..da64df8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java @@ -32,13 +32,13 @@ import java.util.ArrayList; public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT> - extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, ArrayList<IN>, WindowFunction<IN, OUT, KEY, TimeWindow>> { + extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, ArrayList<IN>, WindowFunction<Iterable<IN>, OUT, KEY, TimeWindow>> { private static final long serialVersionUID = 7305948082830843475L; public AccumulatingProcessingTimeWindowOperator( - WindowFunction<IN, OUT, KEY, TimeWindow> function, + WindowFunction<Iterable<IN>, OUT, KEY, TimeWindow> function, KeySelector<IN, KEY> keySelector, TypeSerializer<KEY> keySerializer, TypeSerializer<IN> valueSerializer, @@ -52,7 +52,7 @@ public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT> @Override protected AccumulatingKeyedTimePanes<IN, KEY, OUT> createPanes(KeySelector<IN, KEY> keySelector, Function function) { @SuppressWarnings("unchecked") - WindowFunction<IN, OUT, KEY, Window> windowFunction = (WindowFunction<IN, OUT, KEY, Window>) function; + WindowFunction<Iterable<IN>, OUT, KEY, Window> windowFunction = (WindowFunction<Iterable<IN>, OUT, KEY, Window>) function; return new AccumulatingKeyedTimePanes<>(keySelector, windowFunction); } http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java index 1bb451a..73972e6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java @@ -47,7 +47,7 @@ public class EvictingNonKeyedWindowOperator<IN, OUT, W extends Window> extends N public EvictingNonKeyedWindowOperator(WindowAssigner<? super IN, W> windowAssigner, TypeSerializer<W> windowSerializer, WindowBufferFactory<? super IN, ? extends EvictingWindowBuffer<IN>> windowBufferFactory, - AllWindowFunction<IN, OUT, W> windowFunction, + AllWindowFunction<Iterable<IN>, OUT, W> windowFunction, Trigger<? super IN, ? super W> trigger, Evictor<? super IN, ? super W> evictor) { super(windowAssigner, windowSerializer, windowBufferFactory, windowFunction, trigger); http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java index ad43812..f163de1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java @@ -18,15 +18,22 @@ package org.apache.flink.streaming.runtime.operators.windowing; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.Iterables; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.MergingState; +import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; -import org.apache.flink.streaming.runtime.operators.windowing.buffers.EvictingWindowBuffer; -import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory; import org.apache.flink.streaming.api.windowing.evictors.Evictor; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.Collection; import static java.util.Objects.requireNonNull; @@ -42,42 +49,97 @@ import static java.util.Objects.requireNonNull; * @param <OUT> The type of elements emitted by the {@code WindowFunction}. * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns. */ -public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends WindowOperator<K, IN, OUT, W> { +public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends WindowOperator<K, IN, Iterable<IN>, OUT, W> { private static final long serialVersionUID = 1L; private final Evictor<? super IN, ? super W> evictor; + private final StateDescriptor<? extends ListState<StreamRecord<IN>>> windowStateDescriptor; + public EvictingWindowOperator(WindowAssigner<? super IN, W> windowAssigner, - TypeSerializer<W> windowSerializer, - KeySelector<IN, K> keySelector, - TypeSerializer<K> keySerializer, - WindowBufferFactory<? super IN, ? extends EvictingWindowBuffer<IN>> windowBufferFactory, - WindowFunction<IN, OUT, K, W> windowFunction, - Trigger<? super IN, ? super W> trigger, - Evictor<? super IN, ? super W> evictor) { - super(windowAssigner, windowSerializer, keySelector, keySerializer, windowBufferFactory, windowFunction, trigger); + TypeSerializer<W> windowSerializer, + KeySelector<IN, K> keySelector, + TypeSerializer<K> keySerializer, + StateDescriptor<? extends ListState<StreamRecord<IN>>> windowStateDescriptor, + WindowFunction<Iterable<IN>, OUT, K, W> windowFunction, + Trigger<? super IN, ? super W> trigger, + Evictor<? super IN, ? super W> evictor) { + super(windowAssigner, windowSerializer, keySelector, keySerializer, null, windowFunction, trigger); this.evictor = requireNonNull(evictor); + this.windowStateDescriptor = windowStateDescriptor; } + @Override - @SuppressWarnings("unchecked, rawtypes") - protected void emitWindow(Context context) throws Exception { - timestampedCollector.setTimestamp(context.window.maxTimestamp()); - EvictingWindowBuffer<IN> windowBuffer = (EvictingWindowBuffer<IN>) context.windowBuffer; - - int toEvict = 0; - if (windowBuffer.size() > 0) { - // need some type trickery here... - toEvict = evictor.evict((Iterable) windowBuffer.getElements(), windowBuffer.size(), context.window); + @SuppressWarnings("unchecked") + public final void processElement(StreamRecord<IN> element) throws Exception { + Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(), element.getTimestamp()); + + K key = (K) getStateBackend().getCurrentKey(); + + for (W window: elementWindows) { + + ListState<StreamRecord<IN>> windowState = getPartitionedState(window, windowSerializer, + windowStateDescriptor); + + windowState.add(element); + + context.key = key; + context.window = window; + Trigger.TriggerResult triggerResult = context.onElement(element); + + processTriggerResult(triggerResult, key, window); } + } - windowBuffer.removeElements(toEvict); + @Override + @SuppressWarnings("unchecked,rawtypes") + protected void processTriggerResult(Trigger.TriggerResult triggerResult, K key, W window) throws Exception { + if (!triggerResult.isFire() && !triggerResult.isPurge()) { + // do nothing + return; + } - userFunction.apply(context.key, - context.window, - context.windowBuffer.getUnpackedElements(), - timestampedCollector); + if (triggerResult.isFire()) { + timestampedCollector.setTimestamp(window.maxTimestamp()); + + setKeyContext(key); + + ListState<StreamRecord<IN>> windowState = getPartitionedState(window, windowSerializer, + windowStateDescriptor); + + Iterable<StreamRecord<IN>> contents = windowState.get(); + + // Work around type system restrictions... + int toEvict = evictor.evict((Iterable) contents, Iterables.size(contents), context.window); + + FluentIterable<IN> projectedContents = FluentIterable + .from(contents) + .skip(toEvict) + .transform(new Function<StreamRecord<IN>, IN>() { + @Override + public IN apply(StreamRecord<IN> input) { + return input.getValue(); + } + }); + userFunction.apply(context.key, context.window, projectedContents, timestampedCollector); + + if (triggerResult.isPurge()) { + windowState.clear(); + } else { + // we have to clear the state and set the elements that remain after eviction + windowState.clear(); + for (StreamRecord<IN> rec: FluentIterable.from(contents).skip(toEvict)) { + windowState.add(rec); + } + } + } else if (triggerResult.isPurge()) { + setKeyContext(key); + ListState<StreamRecord<IN>> windowState = getPartitionedState(window, windowSerializer, + windowStateDescriptor); + windowState.clear(); + } } @Override @@ -95,4 +157,11 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window public Evictor<? super IN, ? super W> getEvictor() { return evictor; } + + @Override + @VisibleForTesting + @SuppressWarnings("unchecked, rawtypes") + public StateDescriptor<? extends MergingState<IN, Iterable<IN>>> getStateDescriptor() { + return (StateDescriptor<? extends MergingState<IN, Iterable<IN>>>) windowStateDescriptor; + } }
