[FLINK-4997] [streaming] Introduce ProcessWindowFunction
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1dcb2dcd Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1dcb2dcd Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1dcb2dcd Branch: refs/heads/master Commit: 1dcb2dcd8969941988a4fc7e5488e9272dfd507e Parents: 82db667 Author: Ventura Del Monte <[email protected]> Authored: Wed Nov 23 18:00:23 2016 +0100 Committer: Aljoscha Krettek <[email protected]> Committed: Fri Feb 17 17:15:51 2017 +0100 ---------------------------------------------------------------------- .../api/datastream/WindowedStream.java | 381 ++++++++++++++--- .../FoldApplyProcessWindowFunction.java | 120 ++++++ .../windowing/ProcessWindowFunction.java | 61 +++ .../ReduceApplyProcessWindowFunction.java | 80 ++++ .../windowing/RichProcessWindowFunction.java | 85 ++++ .../windowing/AccumulatingKeyedTimePanes.java | 12 +- ...ccumulatingProcessingTimeWindowOperator.java | 16 +- .../InternalIterableProcessWindowFunction.java | 63 +++ ...nternalSingleValueProcessWindowFunction.java | 66 +++ .../FoldApplyProcessWindowFunctionTest.java | 155 +++++++ .../operators/FoldApplyWindowFunctionTest.java | 28 +- .../functions/InternalWindowFunctionTest.java | 101 ++++- ...AlignedProcessingTimeWindowOperatorTest.java | 419 ++++++++++++++++++- .../operators/windowing/WindowOperatorTest.java | 177 ++++++++ .../streaming/runtime/WindowFoldITCase.java | 78 ++++ 15 files changed, 1738 insertions(+), 104 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java index 04da04d..45eaae5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.api.datastream; +import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.Public; import org.apache.flink.api.common.functions.AggregateFunction; @@ -39,8 +40,11 @@ import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction; import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator; import org.apache.flink.streaming.api.functions.aggregation.SumAggregator; import org.apache.flink.streaming.api.functions.windowing.AggregateApplyWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.FoldApplyProcessWindowFunction; import org.apache.flink.streaming.api.functions.windowing.FoldApplyWindowFunction; import org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.ReduceApplyProcessWindowFunction; import org.apache.flink.streaming.api.functions.windowing.ReduceApplyWindowFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; @@ -60,9 +64,12 @@ import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator; import org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator; import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessWindowFunction; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction; import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction; import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -360,6 +367,98 @@ public class WindowedStream<T, K, W extends Window> { return input.transform(opName, resultType, operator); } + + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * <p>Arriving data is incrementally aggregated using the given reducer. + * + * @param reduceFunction The reduce function that is used for incremental aggregation. + * @param function The window function. + * @return The data stream that is the result of applying the window function to the window. + */ + @PublicEvolving + public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> function) { + TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( + function, ProcessWindowFunction.class, true, true, input.getType(), null, false); + + return reduce(reduceFunction, function, resultType); + } + + + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * <p>Arriving data is incrementally aggregated using the given reducer. + * + * @param reduceFunction The reduce function that is used for incremental aggregation. + * @param function The window function. + * @param resultType Type information for the result type of the window function + * @return The data stream that is the result of applying the window function to the window. + */ + @Internal + public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> function, TypeInformation<R> resultType) { + if (reduceFunction instanceof RichFunction) { + throw new UnsupportedOperationException("ReduceFunction of apply can not be a RichFunction."); + } + //clean the closures + function = input.getExecutionEnvironment().clean(function); + reduceFunction = input.getExecutionEnvironment().clean(reduceFunction); + + String callLocation = Utils.getCallLocationName(); + String udfName = "WindowedStream." + callLocation; + + String opName; + KeySelector<T, K> keySel = input.getKeySelector(); + + OneInputStreamOperator<T, R> operator; + + if (evictor != null) { + @SuppressWarnings({"unchecked", "rawtypes"}) + TypeSerializer<StreamRecord<T>> streamRecordSerializer = + (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig())); + + ListStateDescriptor<StreamRecord<T>> stateDesc = + new ListStateDescriptor<>("window-contents", streamRecordSerializer); + + opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")"; + + operator = + new EvictingWindowOperator<>(windowAssigner, + windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), + keySel, + input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), + stateDesc, + new InternalIterableProcessWindowFunction<>(new ReduceApplyProcessWindowFunction<>(reduceFunction, function)), + trigger, + evictor, + allowedLateness); + + } else { + ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents", + reduceFunction, + input.getType().createSerializer(getExecutionEnvironment().getConfig())); + + opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")"; + + operator = + new WindowOperator<>(windowAssigner, + windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), + keySel, + input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), + stateDesc, + new InternalSingleValueProcessWindowFunction<>(function), + trigger, + allowedLateness); + } + + return input.transform(opName, resultType, operator); + } + // ------------------------------------------------------------------------ // Fold Function // ------------------------------------------------------------------------ @@ -510,6 +609,117 @@ public class WindowedStream<T, K, W extends Window> { return input.transform(opName, resultType, operator); } + + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * <p>Arriving data is incrementally aggregated using the given fold function. + * + * @param initialValue The initial value of the fold. + * @param foldFunction The fold function that is used for incremental aggregation. + * @param windowFunction The window function. + * @return The data stream that is the result of applying the window function to the window. + */ + @PublicEvolving + public <R, ACC> SingleOutputStreamOperator<R> fold(ACC initialValue, FoldFunction<T, ACC> foldFunction, ProcessWindowFunction<ACC, R, K, W> windowFunction) { + if (foldFunction instanceof RichFunction) { + throw new UnsupportedOperationException("FoldFunction can not be a RichFunction."); + } + + TypeInformation<ACC> foldResultType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(), + Utils.getCallLocationName(), true); + + TypeInformation<R> windowResultType = TypeExtractor.getUnaryOperatorReturnType( + windowFunction, ProcessWindowFunction.class, true, true, foldResultType, Utils.getCallLocationName(), false); + + return fold(initialValue, foldFunction, windowFunction, foldResultType, windowResultType); + } + + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * <p>Arriving data is incrementally aggregated using the given fold function. + * + * @param initialValue the initial value to be passed to the first invocation of the fold function + * @param foldFunction The fold function. + * @param foldResultType The result type of the fold function. + * @param windowFunction The process window function. + * @param windowResultType The process window function result type. + * @return The data stream that is the result of applying the fold function to the window. + */ + @Internal + public <R, ACC> SingleOutputStreamOperator<R> fold( + ACC initialValue, + FoldFunction<T, ACC> foldFunction, + ProcessWindowFunction<ACC, R, K, W> windowFunction, + TypeInformation<ACC> foldResultType, + TypeInformation<R> windowResultType) { + if (foldFunction instanceof RichFunction) { + throw new UnsupportedOperationException("FoldFunction can not be a RichFunction."); + } + if (windowAssigner instanceof MergingWindowAssigner) { + throw new UnsupportedOperationException("Fold cannot be used with a merging WindowAssigner."); + } + + //clean the closures + windowFunction = input.getExecutionEnvironment().clean(windowFunction); + foldFunction = input.getExecutionEnvironment().clean(foldFunction); + + String callLocation = Utils.getCallLocationName(); + String udfName = "WindowedStream." + callLocation; + + String opName; + KeySelector<T, K> keySel = input.getKeySelector(); + + OneInputStreamOperator<T, R> operator; + + if (evictor != null) { + @SuppressWarnings({"unchecked", "rawtypes"}) + TypeSerializer<StreamRecord<T>> streamRecordSerializer = + (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig())); + + ListStateDescriptor<StreamRecord<T>> stateDesc = + new ListStateDescriptor<>("window-contents", streamRecordSerializer); + + opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")"; + + operator = + new EvictingWindowOperator<>(windowAssigner, + windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), + keySel, + input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), + stateDesc, + new InternalIterableProcessWindowFunction<>(new FoldApplyProcessWindowFunction<>(initialValue, foldFunction, windowFunction, foldResultType)), + trigger, + evictor, + allowedLateness); + + } else { + FoldingStateDescriptor<T, ACC> stateDesc = new FoldingStateDescriptor<>("window-contents", + initialValue, + foldFunction, + foldResultType.createSerializer(getExecutionEnvironment().getConfig())); + + opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")"; + + operator = + new WindowOperator<>(windowAssigner, + windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), + keySel, + input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), + stateDesc, + new InternalSingleValueProcessWindowFunction<>(windowFunction), + trigger, + allowedLateness); + } + + return input.transform(opName, windowResultType, operator); + } + // ------------------------------------------------------------------------ // Aggregation Function // ------------------------------------------------------------------------ @@ -733,11 +943,53 @@ public class WindowedStream<T, K, W extends Window> { * @return The data stream that is the result of applying the window function to the window. */ public <R> SingleOutputStreamOperator<R> apply(WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) { - - //clean the closure + String callLocation = Utils.getCallLocationName(); function = input.getExecutionEnvironment().clean(function); + return apply(new InternalIterableWindowFunction<>(function), resultType, callLocation); + } + + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * <p> + * Not that this function requires that all data in the windows is buffered until the window + * is evaluated, as the function provides no means of incremental aggregation. + * + * @param function The window function. + * @return The data stream that is the result of applying the window function to the window. + */ + @PublicEvolving + public <R> SingleOutputStreamOperator<R> process(ProcessWindowFunction<T, R, K, W> function) { + TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( + function, ProcessWindowFunction.class, true, true, getInputType(), null, false); + + return process(function, resultType); + } + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * <p> + * Not that this function requires that all data in the windows is buffered until the window + * is evaluated, as the function provides no means of incremental aggregation. + * + * @param function The window function. + * @param resultType Type information for the result type of the window function + * @return The data stream that is the result of applying the window function to the window. + */ + @Internal + public <R> SingleOutputStreamOperator<R> process(ProcessWindowFunction<T, R, K, W> function, TypeInformation<R> resultType) { String callLocation = Utils.getCallLocationName(); + function = input.getExecutionEnvironment().clean(function); + return apply(new InternalIterableProcessWindowFunction<>(function), resultType, callLocation); + } + + private <R> SingleOutputStreamOperator<R> apply(InternalWindowFunction<Iterable<T>, R, K, W> function, TypeInformation<R> resultType, String callLocation) { + String udfName = "WindowedStream." + callLocation; SingleOutputStreamOperator<R> result = createFastTimeOperatorIfValid(function, resultType, udfName); @@ -767,7 +1019,7 @@ public class WindowedStream<T, K, W extends Window> { keySel, input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), stateDesc, - new InternalIterableWindowFunction<>(function), + function, trigger, evictor, allowedLateness); @@ -784,7 +1036,7 @@ public class WindowedStream<T, K, W extends Window> { keySel, input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), stateDesc, - new InternalIterableWindowFunction<>(function), + function, trigger, allowedLateness, legacyWindowOpType); @@ -1211,7 +1463,7 @@ public class WindowedStream<T, K, W extends Window> { } private <R> SingleOutputStreamOperator<R> createFastTimeOperatorIfValid( - Function function, + ReduceFunction<?> function, TypeInformation<R> resultType, String functionName) { @@ -1222,30 +1474,18 @@ public class WindowedStream<T, K, W extends Window> { String opName = "Fast " + timeWindows + " of " + functionName; - if (function instanceof ReduceFunction) { - @SuppressWarnings("unchecked") - ReduceFunction<T> reducer = (ReduceFunction<T>) function; - - @SuppressWarnings("unchecked") - OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>) - new AggregatingProcessingTimeWindowOperator<>( - reducer, input.getKeySelector(), - input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), - input.getType().createSerializer(getExecutionEnvironment().getConfig()), - windowLength, windowSlide); - return input.transform(opName, resultType, op); - } - else if (function instanceof WindowFunction) { - @SuppressWarnings("unchecked") - WindowFunction<T, R, K, TimeWindow> wf = (WindowFunction<T, R, K, TimeWindow>) function; - - OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>( - wf, input.getKeySelector(), - input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), - input.getType().createSerializer(getExecutionEnvironment().getConfig()), - windowLength, windowSlide); - return input.transform(opName, resultType, op); - } + @SuppressWarnings("unchecked") + ReduceFunction<T> reducer = (ReduceFunction<T>) function; + + @SuppressWarnings("unchecked") + OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>) + new AggregatingProcessingTimeWindowOperator<>( + reducer, input.getKeySelector(), + input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), + input.getType().createSerializer(getExecutionEnvironment().getConfig()), + windowLength, windowSlide); + return input.transform(opName, resultType, op); + } else if (windowAssigner.getClass() == TumblingAlignedProcessingTimeWindows.class && trigger == null && evictor == null) { TumblingAlignedProcessingTimeWindows timeWindows = (TumblingAlignedProcessingTimeWindows) windowAssigner; final long windowLength = timeWindows.getSize(); @@ -1253,36 +1493,69 @@ public class WindowedStream<T, K, W extends Window> { String opName = "Fast " + timeWindows + " of " + functionName; - if (function instanceof ReduceFunction) { - @SuppressWarnings("unchecked") - ReduceFunction<T> reducer = (ReduceFunction<T>) function; - - @SuppressWarnings("unchecked") - OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>) - new AggregatingProcessingTimeWindowOperator<>( - reducer, - input.getKeySelector(), - input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), - input.getType().createSerializer(getExecutionEnvironment().getConfig()), - windowLength, windowSlide); - return input.transform(opName, resultType, op); - } - else if (function instanceof WindowFunction) { - @SuppressWarnings("unchecked") - WindowFunction<T, R, K, TimeWindow> wf = (WindowFunction<T, R, K, TimeWindow>) function; - - OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>( - wf, input.getKeySelector(), - input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), - input.getType().createSerializer(getExecutionEnvironment().getConfig()), - windowLength, windowSlide); - return input.transform(opName, resultType, op); - } + @SuppressWarnings("unchecked") + ReduceFunction<T> reducer = (ReduceFunction<T>) function; + + @SuppressWarnings("unchecked") + OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>) + new AggregatingProcessingTimeWindowOperator<>( + reducer, + input.getKeySelector(), + input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), + input.getType().createSerializer(getExecutionEnvironment().getConfig()), + windowLength, windowSlide); + return input.transform(opName, resultType, op); + } + + return null; + } + + private <R> SingleOutputStreamOperator<R> createFastTimeOperatorIfValid( + InternalWindowFunction<Iterable<T>, R, K, W> function, + TypeInformation<R> resultType, + String functionName) { + + if (windowAssigner.getClass() == SlidingAlignedProcessingTimeWindows.class && trigger == null && evictor == null) { + SlidingAlignedProcessingTimeWindows timeWindows = (SlidingAlignedProcessingTimeWindows) windowAssigner; + final long windowLength = timeWindows.getSize(); + final long windowSlide = timeWindows.getSlide(); + + String opName = "Fast " + timeWindows + " of " + functionName; + + @SuppressWarnings("unchecked") + InternalWindowFunction<Iterable<T>, R, K, TimeWindow> timeWindowFunction = + (InternalWindowFunction<Iterable<T>, R, K, TimeWindow>) function; + + OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>( + timeWindowFunction, input.getKeySelector(), + input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), + input.getType().createSerializer(getExecutionEnvironment().getConfig()), + windowLength, windowSlide); + return input.transform(opName, resultType, op); + } else if (windowAssigner.getClass() == TumblingAlignedProcessingTimeWindows.class && trigger == null && evictor == null) { + TumblingAlignedProcessingTimeWindows timeWindows = (TumblingAlignedProcessingTimeWindows) windowAssigner; + final long windowLength = timeWindows.getSize(); + final long windowSlide = timeWindows.getSize(); + + String opName = "Fast " + timeWindows + " of " + functionName; + + @SuppressWarnings("unchecked") + InternalWindowFunction<Iterable<T>, R, K, TimeWindow> timeWindowFunction = + (InternalWindowFunction<Iterable<T>, R, K, TimeWindow>) function; + + + OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>( + timeWindowFunction, input.getKeySelector(), + input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), + input.getType().createSerializer(getExecutionEnvironment().getConfig()), + windowLength, windowSlide); + return input.transform(opName, resultType, op); } return null; } + public StreamExecutionEnvironment getExecutionEnvironment() { return input.getExecutionEnvironment(); } http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java new file mode 100644 index 0000000..e1bc759 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.windowing; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FoldFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.util.Collector; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Collections; + +@Internal +public class FoldApplyProcessWindowFunction<K, W extends Window, T, ACC, R> + extends RichProcessWindowFunction<T, R, K, W> + implements OutputTypeConfigurable<R> { + + private static final long serialVersionUID = 1L; + + private final FoldFunction<T, ACC> foldFunction; + private final ProcessWindowFunction<ACC, R, K, W> windowFunction; + + private byte[] serializedInitialValue; + private TypeSerializer<ACC> accSerializer; + private final TypeInformation<ACC> accTypeInformation; + private transient ACC initialValue; + + public FoldApplyProcessWindowFunction(ACC initialValue, FoldFunction<T, ACC> foldFunction, ProcessWindowFunction<ACC, R, K, W> windowFunction, TypeInformation<ACC> accTypeInformation) { + this.windowFunction = windowFunction; + this.foldFunction = foldFunction; + this.initialValue = initialValue; + this.accTypeInformation = accTypeInformation; + } + + @Override + public void open(Configuration configuration) throws Exception { + FunctionUtils.openFunction(this.windowFunction, configuration); + + if (serializedInitialValue == null) { + throw new RuntimeException("No initial value was serialized for the fold " + + "window function. Probably the setOutputType method was not called."); + } + + ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue); + DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais); + initialValue = accSerializer.deserialize(in); + } + + @Override + public void close() throws Exception { + FunctionUtils.closeFunction(this.windowFunction); + } + + @Override + public void setRuntimeContext(RuntimeContext t) { + super.setRuntimeContext(t); + + FunctionUtils.setFunctionRuntimeContext(this.windowFunction, t); + } + + @Override + public void process(K key, final Context context, Iterable<T> values, Collector<R> out) throws Exception { + ACC result = accSerializer.copy(initialValue); + + for (T val : values) { + result = foldFunction.fold(result, val); + } + + windowFunction.process(key, windowFunction.new Context() { + @Override + public W window() { + return context.window(); + } + }, Collections.singletonList(result), out); + } + + @Override + public void setOutputType(TypeInformation<R> outTypeInfo, ExecutionConfig executionConfig) { + accSerializer = accTypeInformation.createSerializer(executionConfig); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos); + + try { + accSerializer.serialize(initialValue, out); + } catch (IOException ioe) { + throw new RuntimeException("Unable to serialize initial value of type " + + initialValue.getClass().getSimpleName() + " of fold window function.", ioe); + } + + serializedInitialValue = baos.toByteArray(); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java new file mode 100644 index 0000000..9c48e24 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.windowing; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.util.Collector; + +/** + * Base abstract class for functions that are evaluated over keyed (grouped) windows using a context + * for retrieving extra information. + * + * @param <IN> The type of the input value. + * @param <OUT> The type of the output value. + * @param <KEY> The type of the key. + * @param <W> The type of {@code Window} that this window function can be applied on. + */ +@PublicEvolving +public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function { + + private static final long serialVersionUID = 1L; + + /** + * Evaluates the window and outputs none or several elements. + * + * @param key The key for which this window is evaluated. + * @param context The context in which the window is being evaluated. + * @param elements The elements in the window being evaluated. + * @param out A collector for emitting elements. + * + * @throws Exception The function may throw exceptions to fail the program and trigger recovery. + */ + public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception; + + /** + * The context holding window metadata + */ + public abstract class Context { + /** + * @return The window that is being evaluated. + */ + public abstract W window(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java new file mode 100644 index 0000000..9ea1fdf --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.windowing; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.util.Collector; + +import java.util.Collections; + +@Internal +public class ReduceApplyProcessWindowFunction<K, W extends Window, T, R> + extends RichProcessWindowFunction<T, R, K, W> { + + private static final long serialVersionUID = 1L; + + private final ReduceFunction<T> reduceFunction; + private final ProcessWindowFunction<T, R, K, W> windowFunction; + + public ReduceApplyProcessWindowFunction(ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> windowFunction) { + this.windowFunction = windowFunction; + this.reduceFunction = reduceFunction; + } + + @Override + public void process(K k, final Context context, Iterable<T> input, Collector<R> out) throws Exception { + + T curr = null; + for (T val: input) { + if (curr == null) { + curr = val; + } else { + curr = reduceFunction.reduce(curr, val); + } + } + windowFunction.process(k, windowFunction.new Context() { + @Override + public W window() { + return context.window(); + } + }, Collections.singletonList(curr), out); + } + + @Override + public void open(Configuration configuration) throws Exception { + FunctionUtils.openFunction(this.windowFunction, configuration); + } + + @Override + public void close() throws Exception { + FunctionUtils.closeFunction(this.windowFunction); + } + + @Override + public void setRuntimeContext(RuntimeContext t) { + super.setRuntimeContext(t); + + FunctionUtils.setFunctionRuntimeContext(this.windowFunction, t); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessWindowFunction.java new file mode 100644 index 0000000..ac55bc6 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessWindowFunction.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.windowing; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.IterationRuntimeContext; +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.windowing.windows.Window; + +/** + * Base rich abstract class for functions that are evaluated over keyed (grouped) windows using a context + * for passing extra information. + * + * @param <IN> The type of the input value. + * @param <OUT> The type of the output value. + * @param <KEY> The type of the key. + * @param <W> The type of {@code Window} that this window function can be applied on. + */ +@PublicEvolving +public abstract class RichProcessWindowFunction<IN, OUT, KEY, W extends Window> + extends ProcessWindowFunction<IN, OUT, KEY, W> + implements RichFunction { + + private static final long serialVersionUID = 1L; + + + // -------------------------------------------------------------------------------------------- + // Runtime context access + // -------------------------------------------------------------------------------------------- + + private transient RuntimeContext runtimeContext; + + @Override + public void setRuntimeContext(RuntimeContext t) { + this.runtimeContext = t; + } + + @Override + public RuntimeContext getRuntimeContext() { + if (this.runtimeContext != null) { + return this.runtimeContext; + } else { + throw new IllegalStateException("The runtime context has not been initialized."); + } + } + + @Override + public IterationRuntimeContext getIterationRuntimeContext() { + if (this.runtimeContext == null) { + throw new IllegalStateException("The runtime context has not been initialized."); + } else if (this.runtimeContext instanceof IterationRuntimeContext) { + return (IterationRuntimeContext) this.runtimeContext; + } else { + throw new IllegalStateException("This stub is not part of an iteration step function."); + } + } + + // -------------------------------------------------------------------------------------------- + // Default life cycle methods + // -------------------------------------------------------------------------------------------- + + @Override + public void open(Configuration parameters) throws Exception {} + + @Override + public void close() throws Exception {} +} http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java index a252ece..87c5aca 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java @@ -20,8 +20,8 @@ package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.annotation.Internal; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction; import org.apache.flink.util.UnionIterator; -import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.api.windowing.windows.Window; @@ -36,7 +36,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed private final KeyMap.LazyFactory<ArrayList<Type>> listFactory = getListFactory(); - private final WindowFunction<Type, Result, Key, Window> function; + private final InternalWindowFunction<Iterable<Type>, Result, Key, Window> function; /** * IMPORTANT: This value needs to start at one, so it is fresher than the value that new entries have (zero) */ @@ -44,7 +44,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed // ------------------------------------------------------------------------ - public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, WindowFunction<Type, Result, Key, Window> function) { + public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, InternalWindowFunction<Iterable<Type>, Result, Key, Window> function) { this.keySelector = keySelector; this.function = function; } @@ -59,7 +59,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed } @Override - public void evaluateWindow(Collector<Result> out, TimeWindow window, + public void evaluateWindow(Collector<Result> out, final TimeWindow window, AbstractStreamOperator<Result> operator) throws Exception { if (previousPanes.isEmpty()) { @@ -86,7 +86,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed static final class WindowFunctionTraversal<Key, Type, Result> implements KeyMap.TraversalEvaluator<Key, ArrayList<Type>> { - private final WindowFunction<Type, Result, Key, Window> function; + private final InternalWindowFunction<Iterable<Type>, Result, Key, Window> function; private final UnionIterator<Type> unionIterator; @@ -99,7 +99,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed private Key currentKey; - WindowFunctionTraversal(WindowFunction<Type, Result, Key, Window> function, TimeWindow window, + WindowFunctionTraversal(InternalWindowFunction<Iterable<Type>, Result, Key, Window> function, TimeWindow window, Collector<Result> out, AbstractStreamOperator<Result> contextOperator) { this.function = function; this.out = out; http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java index 7adaf13..094b34d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java @@ -23,22 +23,22 @@ import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.runtime.state.ArrayListSerializer; -import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction; import java.util.ArrayList; @Internal @Deprecated public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT> - extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, ArrayList<IN>, WindowFunction<IN, OUT, KEY, TimeWindow>> { + extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, ArrayList<IN>, InternalWindowFunction<Iterable<IN>, OUT, KEY, TimeWindow>> { private static final long serialVersionUID = 7305948082830843475L; - + public AccumulatingProcessingTimeWindowOperator( - WindowFunction<IN, OUT, KEY, TimeWindow> function, + InternalWindowFunction<Iterable<IN>, OUT, KEY, TimeWindow> function, KeySelector<IN, KEY> keySelector, TypeSerializer<KEY> keySerializer, TypeSerializer<IN> valueSerializer, @@ -46,14 +46,14 @@ public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT> long windowSlide) { super(function, keySelector, keySerializer, - new ArrayListSerializer<IN>(valueSerializer), windowLength, windowSlide); + new ArrayListSerializer<>(valueSerializer), windowLength, windowSlide); } @Override protected AccumulatingKeyedTimePanes<IN, KEY, OUT> createPanes(KeySelector<IN, KEY> keySelector, Function function) { @SuppressWarnings("unchecked") - WindowFunction<IN, OUT, KEY, Window> windowFunction = (WindowFunction<IN, OUT, KEY, Window>) function; - + InternalWindowFunction<Iterable<IN>, OUT, KEY, Window> windowFunction = (InternalWindowFunction<Iterable<IN>, OUT, KEY, Window>) function; + return new AccumulatingKeyedTimePanes<>(keySelector, windowFunction); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java new file mode 100644 index 0000000..de516a5 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing.functions; + +import org.apache.flink.api.common.functions.IterationRuntimeContext; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.util.Collector; + +/** + * Internal window function for wrapping a {@link ProcessWindowFunction} that takes an {@code Iterable} + * when the window state also is an {@code Iterable}. + */ +public final class InternalIterableProcessWindowFunction<IN, OUT, KEY, W extends Window> + extends WrappingFunction<ProcessWindowFunction<IN, OUT, KEY, W>> + implements InternalWindowFunction<Iterable<IN>, OUT, KEY, W> { + + private static final long serialVersionUID = 1L; + + public InternalIterableProcessWindowFunction(ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction) { + super(wrappedFunction); + } + + @Override + public void apply(KEY key, final W window, Iterable<IN> input, Collector<OUT> out) throws Exception { + ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction = this.wrappedFunction; + ProcessWindowFunction<IN, OUT, KEY, W>.Context context = wrappedFunction.new Context() { + @Override + public W window() { + return window; + } + }; + + wrappedFunction.process(key, context, input, out); + } + + @Override + public RuntimeContext getRuntimeContext() { + throw new RuntimeException("This should never be called."); + } + + @Override + public IterationRuntimeContext getIterationRuntimeContext() { + throw new RuntimeException("This should never be called."); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java new file mode 100644 index 0000000..b28c208 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing.functions; + +import org.apache.flink.api.common.functions.IterationRuntimeContext; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.WindowFunction; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.util.Collector; + +import java.util.Collections; + +/** + * Internal window function for wrapping a {@link WindowFunction} that takes an {@code Iterable} + * when the window state is a single value. + */ +public final class InternalSingleValueProcessWindowFunction<IN, OUT, KEY, W extends Window> + extends WrappingFunction<ProcessWindowFunction<IN, OUT, KEY, W>> + implements InternalWindowFunction<IN, OUT, KEY, W> { + + private static final long serialVersionUID = 1L; + + public InternalSingleValueProcessWindowFunction(ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction) { + super(wrappedFunction); + } + + @Override + public void apply(KEY key, final W window, IN input, Collector<OUT> out) throws Exception { + ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction = this.wrappedFunction; + ProcessWindowFunction<IN, OUT, KEY, W>.Context context = wrappedFunction.new Context() { + @Override + public W window() { + return window; + } + }; + + wrappedFunction.process(key, context, Collections.singletonList(input), out); + } + + @Override + public RuntimeContext getRuntimeContext() { + throw new RuntimeException("This should never be called."); + } + + @Override + public IterationRuntimeContext getIterationRuntimeContext() { + throw new RuntimeException("This should never be called."); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java new file mode 100644 index 0000000..af5c77a --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.functions.FoldFunction; +import org.apache.flink.api.common.functions.util.ListCollector; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.Utils; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.functions.windowing.FoldApplyProcessWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.FoldApplyWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.WindowFunction; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.graph.StreamGraphGenerator; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; +import org.apache.flink.streaming.api.transformations.SourceTransformation; +import org.apache.flink.streaming.api.transformations.StreamTransformation; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction; +import org.apache.flink.util.Collector; +import org.junit.Test; +import org.junit.Assert; + +import java.util.ArrayList; +import java.util.List; + +public class FoldApplyProcessWindowFunctionTest { + + /** + * Tests that the FoldWindowFunction gets the output type serializer set by the + * StreamGraphGenerator and checks that the FoldWindowFunction computes the correct result. + */ + @Test + public void testFoldWindowFunctionOutputTypeConfigurable() throws Exception{ + StreamExecutionEnvironment env = new DummyStreamExecutionEnvironment(); + + List<StreamTransformation<?>> transformations = new ArrayList<>(); + + int initValue = 1; + + FoldApplyProcessWindowFunction<Integer, TimeWindow, Integer, Integer, Integer> foldWindowFunction = new FoldApplyProcessWindowFunction<>( + initValue, + new FoldFunction<Integer, Integer>() { + @Override + public Integer fold(Integer accumulator, Integer value) throws Exception { + return accumulator + value; + } + + }, + new ProcessWindowFunction<Integer, Integer, Integer, TimeWindow>() { + @Override + public void process(Integer integer, + Context context, + Iterable<Integer> input, + Collector<Integer> out) throws Exception { + for (Integer in: input) { + out.collect(in); + } + } + }, + BasicTypeInfo.INT_TYPE_INFO + ); + + AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> windowOperator = new AccumulatingProcessingTimeWindowOperator<>( + new InternalIterableProcessWindowFunction<>(foldWindowFunction), + new KeySelector<Integer, Integer>() { + private static final long serialVersionUID = -7951310554369722809L; + + @Override + public Integer getKey(Integer value) throws Exception { + return value; + } + }, + IntSerializer.INSTANCE, + IntSerializer.INSTANCE, + 3000, + 3000 + ); + + SourceFunction<Integer> sourceFunction = new SourceFunction<Integer>(){ + + private static final long serialVersionUID = 8297735565464653028L; + + @Override + public void run(SourceContext<Integer> ctx) throws Exception { + + } + + @Override + public void cancel() { + + } + }; + + SourceTransformation<Integer> source = new SourceTransformation<>("", new StreamSource<>(sourceFunction), BasicTypeInfo.INT_TYPE_INFO, 1); + + transformations.add(new OneInputTransformation<>(source, "test", windowOperator, BasicTypeInfo.INT_TYPE_INFO, 1)); + + StreamGraph streamGraph = StreamGraphGenerator.generate(env, transformations); + + List<Integer> result = new ArrayList<>(); + List<Integer> input = new ArrayList<>(); + List<Integer> expected = new ArrayList<>(); + + input.add(1); + input.add(2); + input.add(3); + + for (int value : input) { + initValue += value; + } + + expected.add(initValue); + + foldWindowFunction.process(0, foldWindowFunction.new Context() { + @Override + public TimeWindow window() { + return new TimeWindow(0, 1); + } + }, input, new ListCollector<>(result)); + + Assert.assertEquals(expected, result); + } + + public static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment { + + @Override + public JobExecutionResult execute(String jobName) throws Exception { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java index 91ec427..fecd440 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java @@ -35,6 +35,7 @@ import org.apache.flink.streaming.api.transformations.SourceTransformation; import org.apache.flink.streaming.api.transformations.StreamTransformation; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction; import org.apache.flink.util.Collector; import org.junit.Test; import org.junit.Assert; @@ -81,19 +82,20 @@ public class FoldApplyWindowFunctionTest { ); AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> windowOperator = new AccumulatingProcessingTimeWindowOperator<>( - foldWindowFunction, - new KeySelector<Integer, Integer>() { - private static final long serialVersionUID = -7951310554369722809L; - - @Override - public Integer getKey(Integer value) throws Exception { - return value; - } - }, - IntSerializer.INSTANCE, - IntSerializer.INSTANCE, - 3000, - 3000 + new InternalIterableWindowFunction<>( + foldWindowFunction), + new KeySelector<Integer, Integer>() { + private static final long serialVersionUID = -7951310554369722809L; + + @Override + public Integer getKey(Integer value) throws Exception { + return value; + } + }, + IntSerializer.INSTANCE, + IntSerializer.INSTANCE, + 3000, + 3000 ); SourceFunction<Integer> sourceFunction = new SourceFunction<Integer>(){ http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java index f3c3423..3c73035 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java @@ -25,12 +25,15 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils; import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.RichProcessWindowFunction; import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction; import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessWindowFunction; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction; import org.apache.flink.util.Collector; import org.hamcrest.collection.IsIterableContainingInOrder; @@ -115,7 +118,48 @@ public class InternalWindowFunctionTest { Collector<String> c = (Collector<String>) mock(Collector.class); windowFunction.apply(42L, w, i, c); - verify(mock).apply(42L, w, i, c); + verify(mock).apply(eq(42L), eq(w), eq(i), eq(c)); + + // check close + windowFunction.close(); + verify(mock).close(); + } + + @SuppressWarnings("unchecked") + @Test + public void testInternalIterableProcessWindowFunction() throws Exception { + + ProcessWindowFunctionMock mock = mock(ProcessWindowFunctionMock.class); + InternalIterableProcessWindowFunction<Long, String, Long, TimeWindow> windowFunction = + new InternalIterableProcessWindowFunction<>(mock); + + // check setOutputType + TypeInformation<String> stringType = BasicTypeInfo.STRING_TYPE_INFO; + ExecutionConfig execConf = new ExecutionConfig(); + execConf.setParallelism(42); + + StreamingFunctionUtils.setOutputType(windowFunction, stringType, execConf); + verify(mock).setOutputType(stringType, execConf); + + // check open + Configuration config = new Configuration(); + + windowFunction.open(config); + verify(mock).open(config); + + // check setRuntimeContext + RuntimeContext rCtx = mock(RuntimeContext.class); + + windowFunction.setRuntimeContext(rCtx); + verify(mock).setRuntimeContext(rCtx); + + // check apply + TimeWindow w = mock(TimeWindow.class); + Iterable<Long> i = (Iterable<Long>)mock(Iterable.class); + Collector<String> c = (Collector<String>) mock(Collector.class); + + windowFunction.apply(42L, w, i, c); + verify(mock).process(eq(42L), (ProcessWindowFunctionMock.Context) anyObject(), eq(i), eq(c)); // check close windowFunction.close(); @@ -204,6 +248,59 @@ public class InternalWindowFunctionTest { verify(mock).close(); } + @SuppressWarnings("unchecked") + @Test + public void testInternalSingleValueProcessWindowFunction() throws Exception { + + ProcessWindowFunctionMock mock = mock(ProcessWindowFunctionMock.class); + InternalSingleValueProcessWindowFunction<Long, String, Long, TimeWindow> windowFunction = + new InternalSingleValueProcessWindowFunction<>(mock); + + // check setOutputType + TypeInformation<String> stringType = BasicTypeInfo.STRING_TYPE_INFO; + ExecutionConfig execConf = new ExecutionConfig(); + execConf.setParallelism(42); + + StreamingFunctionUtils.setOutputType(windowFunction, stringType, execConf); + verify(mock).setOutputType(stringType, execConf); + + // check open + Configuration config = new Configuration(); + + windowFunction.open(config); + verify(mock).open(config); + + // check setRuntimeContext + RuntimeContext rCtx = mock(RuntimeContext.class); + + windowFunction.setRuntimeContext(rCtx); + verify(mock).setRuntimeContext(rCtx); + + // check apply + TimeWindow w = mock(TimeWindow.class); + Collector<String> c = (Collector<String>) mock(Collector.class); + + windowFunction.apply(42L, w, 23L, c); + verify(mock).process(eq(42L), (ProcessWindowFunctionMock.Context) anyObject(), (Iterable<Long>)argThat(IsIterableContainingInOrder.contains(23L)), eq(c)); + + // check close + windowFunction.close(); + verify(mock).close(); + } + + public static class ProcessWindowFunctionMock + extends RichProcessWindowFunction<Long, String, Long, TimeWindow> + implements OutputTypeConfigurable<String> { + + private static final long serialVersionUID = 1L; + + @Override + public void setOutputType(TypeInformation<String> outTypeInfo, ExecutionConfig executionConfig) { } + + @Override + public void process(Long aLong, Context context, Iterable<Long> input, Collector<String> out) throws Exception { } + } + public static class WindowFunctionMock extends RichWindowFunction<Long, String, Long, TimeWindow> implements OutputTypeConfigurable<String> { @@ -214,7 +311,7 @@ public class InternalWindowFunctionTest { public void setOutputType(TypeInformation<String> outTypeInfo, ExecutionConfig executionConfig) { } @Override - public void apply(Long aLong, TimeWindow window, Iterable<Long> input, Collector<String> out) throws Exception { } + public void apply(Long aLong, TimeWindow w, Iterable<Long> input, Collector<String> out) throws Exception { } } public static class AllWindowFunctionMock http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java index 255a20f..508d2e1 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java @@ -36,8 +36,12 @@ import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.RichProcessWindowFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.StreamTask; @@ -47,6 +51,9 @@ import org.apache.flink.util.Collector; import org.junit.After; import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; import java.util.ArrayList; import java.util.Arrays; @@ -64,10 +71,12 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @SuppressWarnings({"serial"}) +@PrepareForTest(InternalIterableWindowFunction.class) +@RunWith(PowerMockRunner.class) public class AccumulatingAlignedProcessingTimeWindowOperatorTest { @SuppressWarnings("unchecked") - private final WindowFunction<String, String, String, TimeWindow> mockFunction = mock(WindowFunction.class); + private final InternalIterableWindowFunction<String, String, String, TimeWindow> mockFunction = mock(InternalIterableWindowFunction.class); @SuppressWarnings("unchecked") private final KeySelector<String, String> mockKeySelector = mock(KeySelector.class); @@ -79,26 +88,34 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { } }; - private final WindowFunction<Integer, Integer, Integer, TimeWindow> validatingIdentityFunction = - new WindowFunction<Integer, Integer, Integer, TimeWindow>() - { - @Override - public void apply(Integer key, - TimeWindow window, - Iterable<Integer> values, - Collector<Integer> out) { - for (Integer val : values) { - assertEquals(key, val); - out.collect(val); - } - } - }; + private final InternalIterableWindowFunction<Integer, Integer, Integer, TimeWindow> validatingIdentityFunction = + new InternalIterableWindowFunction<>(new WindowFunction<Integer, Integer, Integer, TimeWindow>() { + @Override + public void apply(Integer key, TimeWindow window, Iterable<Integer> values, Collector<Integer> out) throws Exception { + for (Integer val : values) { + assertEquals(key, val); + out.collect(val); + } + } + }); + + private final InternalIterableProcessWindowFunction<Integer, Integer, Integer, TimeWindow> validatingIdentityProcessFunction = + new InternalIterableProcessWindowFunction<>(new ProcessWindowFunction<Integer, Integer, Integer, TimeWindow>() { + @Override + public void process(Integer key, Context context, Iterable<Integer> values, Collector<Integer> out) throws Exception { + for (Integer val : values) { + assertEquals(key, val); + out.collect(val); + } + } + }); // ------------------------------------------------------------------------ public AccumulatingAlignedProcessingTimeWindowOperatorTest() { ClosureCleaner.clean(identitySelector, false); ClosureCleaner.clean(validatingIdentityFunction, false); + ClosureCleaner.clean(validatingIdentityProcessFunction, false); } // ------------------------------------------------------------------------ @@ -281,6 +298,50 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { } @Test + public void testTumblingWindowWithProcessFunction() throws Exception { + try { + final int windowSize = 50; + + // tumbling window that triggers every 20 milliseconds + AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op = + new AccumulatingProcessingTimeWindowOperator<>( + validatingIdentityProcessFunction, identitySelector, + IntSerializer.INSTANCE, IntSerializer.INSTANCE, + windowSize, windowSize); + + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(op, identitySelector, BasicTypeInfo.INT_TYPE_INFO); + + testHarness.open(); + + final int numElements = 1000; + + long currentTime = 0; + + for (int i = 0; i < numElements; i++) { + testHarness.processElement(new StreamRecord<>(i)); + currentTime = currentTime + 10; + testHarness.setProcessingTime(currentTime); + } + + + List<Integer> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords()); + assertEquals(numElements, result.size()); + + Collections.sort(result); + for (int i = 0; i < numElements; i++) { + assertEquals(i, result.get(i).intValue()); + } + + testHarness.close(); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test public void testSlidingWindow() throws Exception { // tumbling window that triggers every 20 milliseconds @@ -333,6 +394,58 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { } @Test + public void testSlidingWindowWithProcessFunction() throws Exception { + + // tumbling window that triggers every 20 milliseconds + AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op = + new AccumulatingProcessingTimeWindowOperator<>( + validatingIdentityProcessFunction, identitySelector, + IntSerializer.INSTANCE, IntSerializer.INSTANCE, 150, 50); + + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(op, identitySelector, BasicTypeInfo.INT_TYPE_INFO); + + testHarness.open(); + + final int numElements = 1000; + + long currentTime = 0; + + for (int i = 0; i < numElements; i++) { + testHarness.processElement(new StreamRecord<>(i)); + currentTime = currentTime + 10; + testHarness.setProcessingTime(currentTime); + } + + // get and verify the result + List<Integer> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords()); + + // if we kept this running, each element would be in the result three times (for each slide). + // we are closing the window before the final panes are through three times, so we may have less + // elements. + if (result.size() < numElements || result.size() > 3 * numElements) { + fail("Wrong number of results: " + result.size()); + } + + Collections.sort(result); + int lastNum = -1; + int lastCount = -1; + + for (int num : result) { + if (num == lastNum) { + lastCount++; + assertTrue(lastCount <= 3); + } + else { + lastNum = num; + lastCount = 1; + } + } + + testHarness.close(); + } + + @Test public void testTumblingWindowSingleElements() throws Exception { try { @@ -379,7 +492,55 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { fail(e.getMessage()); } } - + + @Test + public void testTumblingWindowSingleElementsWithProcessFunction() throws Exception { + + try { + + // tumbling window that triggers every 20 milliseconds + AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op = + new AccumulatingProcessingTimeWindowOperator<>( + validatingIdentityProcessFunction, identitySelector, + IntSerializer.INSTANCE, IntSerializer.INSTANCE, 50, 50); + + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(op, identitySelector, BasicTypeInfo.INT_TYPE_INFO); + + testHarness.open(); + + testHarness.setProcessingTime(0); + + testHarness.processElement(new StreamRecord<>(1)); + testHarness.processElement(new StreamRecord<>(2)); + + testHarness.setProcessingTime(50); + + testHarness.processElement(new StreamRecord<>(3)); + testHarness.processElement(new StreamRecord<>(4)); + testHarness.processElement(new StreamRecord<>(5)); + + testHarness.setProcessingTime(100); + + testHarness.processElement(new StreamRecord<>(6)); + + testHarness.setProcessingTime(200); + + + List<Integer> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords()); + assertEquals(6, result.size()); + + Collections.sort(result); + assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6), result); + + testHarness.close(); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + @Test public void testSlidingWindowSingleElements() throws Exception { try { @@ -420,6 +581,126 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { } @Test + public void testSlidingWindowSingleElementsWithProcessFunction() throws Exception { + try { + + // tumbling window that triggers every 20 milliseconds + AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op = + new AccumulatingProcessingTimeWindowOperator<>( + validatingIdentityProcessFunction, identitySelector, + IntSerializer.INSTANCE, IntSerializer.INSTANCE, 150, 50); + + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(op, identitySelector, BasicTypeInfo.INT_TYPE_INFO); + + testHarness.setProcessingTime(0); + + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(1)); + testHarness.processElement(new StreamRecord<>(2)); + + testHarness.setProcessingTime(50); + testHarness.setProcessingTime(100); + testHarness.setProcessingTime(150); + + List<Integer> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords()); + + assertEquals(6, result.size()); + + Collections.sort(result); + assertEquals(Arrays.asList(1, 1, 1, 2, 2, 2), result); + + testHarness.close(); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void checkpointRestoreWithPendingWindowTumblingWithProcessFunction() { + try { + final int windowSize = 200; + + // tumbling window that triggers every 200 milliseconds + AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op = + new AccumulatingProcessingTimeWindowOperator<>( + validatingIdentityProcessFunction, identitySelector, + IntSerializer.INSTANCE, IntSerializer.INSTANCE, + windowSize, windowSize); + + OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = + new OneInputStreamOperatorTestHarness<>(op); + + testHarness.setup(); + testHarness.open(); + + testHarness.setProcessingTime(0); + + // inject some elements + final int numElementsFirst = 700; + final int numElements = 1000; + for (int i = 0; i < numElementsFirst; i++) { + testHarness.processElement(new StreamRecord<>(i)); + } + + // draw a snapshot and dispose the window + int beforeSnapShot = testHarness.getOutput().size(); + StreamStateHandle state = testHarness.snapshotLegacy(1L, System.currentTimeMillis()); + List<Integer> resultAtSnapshot = extractFromStreamRecords(testHarness.getOutput()); + int afterSnapShot = testHarness.getOutput().size(); + assertEquals("operator performed computation during snapshot", beforeSnapShot, afterSnapShot); + assertTrue(afterSnapShot <= numElementsFirst); + + // inject some random elements, which should not show up in the state + for (int i = 0; i < 300; i++) { + testHarness.processElement(new StreamRecord<>(i + numElementsFirst)); + } + + testHarness.close(); + op.dispose(); + + // re-create the operator and restore the state + op = new AccumulatingProcessingTimeWindowOperator<>( + validatingIdentityProcessFunction, identitySelector, + IntSerializer.INSTANCE, IntSerializer.INSTANCE, + windowSize, windowSize); + + testHarness = new OneInputStreamOperatorTestHarness<>(op); + + testHarness.setup(); + testHarness.restore(state); + testHarness.open(); + + // inject some more elements + for (int i = numElementsFirst; i < numElements; i++) { + testHarness.processElement(new StreamRecord<>(i)); + } + + testHarness.setProcessingTime(400); + + // get and verify the result + List<Integer> finalResult = new ArrayList<>(); + finalResult.addAll(resultAtSnapshot); + List<Integer> finalPartialResult = extractFromStreamRecords(testHarness.getOutput()); + finalResult.addAll(finalPartialResult); + assertEquals(numElements, finalResult.size()); + + Collections.sort(finalResult); + for (int i = 0; i < numElements; i++) { + assertEquals(i, finalResult.get(i).intValue()); + } + testHarness.close(); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test public void checkpointRestoreWithPendingWindowTumbling() { try { final int windowSize = 200; @@ -501,6 +782,98 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { } @Test + public void checkpointRestoreWithPendingWindowSlidingWithProcessFunction() { + try { + final int factor = 4; + final int windowSlide = 50; + final int windowSize = factor * windowSlide; + + // sliding window (200 msecs) every 50 msecs + AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op = + new AccumulatingProcessingTimeWindowOperator<>( + validatingIdentityProcessFunction, identitySelector, + IntSerializer.INSTANCE, IntSerializer.INSTANCE, + windowSize, windowSlide); + + OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = + new OneInputStreamOperatorTestHarness<>(op); + + testHarness.setProcessingTime(0); + + testHarness.setup(); + testHarness.open(); + + // inject some elements + final int numElements = 1000; + final int numElementsFirst = 700; + + for (int i = 0; i < numElementsFirst; i++) { + testHarness.processElement(new StreamRecord<>(i)); + } + + // draw a snapshot + List<Integer> resultAtSnapshot = extractFromStreamRecords(testHarness.getOutput()); + int beforeSnapShot = testHarness.getOutput().size(); + StreamStateHandle state = testHarness.snapshotLegacy(1L, System.currentTimeMillis()); + int afterSnapShot = testHarness.getOutput().size(); + assertEquals("operator performed computation during snapshot", beforeSnapShot, afterSnapShot); + + assertTrue(resultAtSnapshot.size() <= factor * numElementsFirst); + + // inject the remaining elements - these should not influence the snapshot + for (int i = numElementsFirst; i < numElements; i++) { + testHarness.processElement(new StreamRecord<>(i)); + } + + testHarness.close(); + + // re-create the operator and restore the state + op = new AccumulatingProcessingTimeWindowOperator<>( + validatingIdentityProcessFunction, identitySelector, + IntSerializer.INSTANCE, IntSerializer.INSTANCE, + windowSize, windowSlide); + + testHarness = new OneInputStreamOperatorTestHarness<>(op); + + testHarness.setup(); + testHarness.restore(state); + testHarness.open(); + + + // inject again the remaining elements + for (int i = numElementsFirst; i < numElements; i++) { + testHarness.processElement(new StreamRecord<>(i)); + } + + testHarness.setProcessingTime(50); + testHarness.setProcessingTime(100); + testHarness.setProcessingTime(150); + testHarness.setProcessingTime(200); + testHarness.setProcessingTime(250); + testHarness.setProcessingTime(300); + testHarness.setProcessingTime(350); + + // get and verify the result + List<Integer> finalResult = new ArrayList<>(resultAtSnapshot); + List<Integer> finalPartialResult = extractFromStreamRecords(testHarness.getOutput()); + finalResult.addAll(finalPartialResult); + assertEquals(factor * numElements, finalResult.size()); + + Collections.sort(finalResult); + for (int i = 0; i < factor * numElements; i++) { + assertEquals(i / factor, finalResult.get(i).intValue()); + } + + testHarness.close(); + op.dispose(); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test public void checkpointRestoreWithPendingWindowSliding() { try { final int factor = 4; @@ -601,8 +974,12 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { // tumbling window that triggers every 20 milliseconds AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op = new AccumulatingProcessingTimeWindowOperator<>( - new StatefulFunction(), identitySelector, - IntSerializer.INSTANCE, IntSerializer.INSTANCE, 50, 50); + new InternalIterableProcessWindowFunction<>(new StatefulFunction()), + identitySelector, + IntSerializer.INSTANCE, + IntSerializer.INSTANCE, + 50, + 50); OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(op, identitySelector, BasicTypeInfo.INT_TYPE_INFO); @@ -661,7 +1038,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { // ------------------------------------------------------------------------ - private static class StatefulFunction extends RichWindowFunction<Integer, Integer, Integer, TimeWindow> { + private static class StatefulFunction extends RichProcessWindowFunction<Integer, Integer, Integer, TimeWindow> { // we use a concurrent map here even though there is no concurrency, to // get "volatile" style access to entries @@ -677,8 +1054,8 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { } @Override - public void apply(Integer key, - TimeWindow window, + public void process(Integer key, + Context context, Iterable<Integer> values, Collector<Integer> out) throws Exception { for (Integer i : values) {
