[FLINK-5157] [streaming] Introduce ProcessAllWindowFunction
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/788b8392 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/788b8392 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/788b8392 Branch: refs/heads/master Commit: 788b839213811c6f2407ac6d54fef28dfa3d29a6 Parents: 87b9077 Author: Ventura Del Monte <[email protected]> Authored: Wed Feb 22 14:55:17 2017 +0100 Committer: Aljoscha Krettek <[email protected]> Committed: Tue Feb 28 14:02:56 2017 +0100 ---------------------------------------------------------------------- .../api/datastream/AllWindowedStream.java | 398 ++++++++++++++++- .../FoldApplyProcessAllWindowFunction.java | 120 +++++ .../windowing/ProcessAllWindowFunction.java | 59 +++ .../ReduceApplyProcessAllWindowFunction.java | 80 ++++ .../windowing/RichProcessAllWindowFunction.java | 84 ++++ ...ternalAggregateProcessAllWindowFunction.java | 83 ++++ ...nternalIterableProcessAllWindowFunction.java | 63 +++ ...rnalSingleValueProcessAllWindowFunction.java | 65 +++ ...nternalSingleValueProcessWindowFunction.java | 3 +- .../FoldApplyProcessWindowFunctionTest.java | 99 +++++ .../operators/StateDescriptorPassingTest.java | 19 + .../functions/InternalWindowFunctionTest.java | 193 +++++++- .../windowing/AllWindowTranslationTest.java | 445 +++++++++++++++++++ .../streaming/api/scala/AllWindowedStream.scala | 186 +++++++- .../function/ProcessAllWindowFunction.scala | 59 +++ .../function/RichProcessAllWindowFunction.scala | 86 ++++ .../ScalaProcessWindowFunctionWrapper.scala | 85 +++- .../api/scala/AllWindowTranslationTest.scala | 410 ++++++++++++++++- .../streaming/api/scala/WindowFoldITCase.scala | 60 ++- .../api/scala/WindowFunctionITCase.scala | 51 ++- .../api/scala/WindowReduceITCase.scala | 59 ++- ...ngIdentityRichProcessAllWindowFunction.scala | 81 ++++ .../streaming/runtime/WindowFoldITCase.java | 73 +++ 23 files changed, 2830 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java index 742a2ed..a45cb0a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java @@ -40,9 +40,12 @@ import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator import org.apache.flink.streaming.api.functions.aggregation.SumAggregator; import org.apache.flink.streaming.api.functions.windowing.AggregateApplyAllWindowFunction; import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.FoldApplyProcessAllWindowFunction; import org.apache.flink.streaming.api.functions.windowing.PassThroughAllWindowFunction; import org.apache.flink.streaming.api.functions.windowing.FoldApplyAllWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction; import org.apache.flink.streaming.api.functions.windowing.ReduceApplyAllWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.ReduceApplyProcessAllWindowFunction; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; @@ -52,8 +55,12 @@ import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator; import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalAggregateProcessAllWindowFunction; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessAllWindowFunction; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessAllWindowFunction; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction; import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -287,6 +294,102 @@ public class AllWindowedStream<T, W extends Window> { return input.transform(opName, resultType, operator).forceNonParallel(); } + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * <p> + * Arriving data is incrementally aggregated using the given reducer. + * + * @param reduceFunction The reduce function that is used for incremental aggregation. + * @param function The process window function. + * @return The data stream that is the result of applying the window function to the window. + */ + @PublicEvolving + public <R> SingleOutputStreamOperator<R> reduce( + ReduceFunction<T> reduceFunction, + ProcessAllWindowFunction<T, R, W> function) { + + TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( + function, ProcessAllWindowFunction.class, true, true, input.getType(), null, false); + + return reduce(reduceFunction, function, resultType); + } + + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * <p> + * Arriving data is incrementally aggregated using the given reducer. + * + * @param reduceFunction The reduce function that is used for incremental aggregation. + * @param function The process window function. + * @param resultType Type information for the result type of the window function + * @return The data stream that is the result of applying the window function to the window. + */ + @PublicEvolving + public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, ProcessAllWindowFunction<T, R, W> function, TypeInformation<R> resultType) { + if (reduceFunction instanceof RichFunction) { + throw new UnsupportedOperationException("ReduceFunction of reduce can not be a RichFunction."); + } + + //clean the closures + function = input.getExecutionEnvironment().clean(function); + reduceFunction = input.getExecutionEnvironment().clean(reduceFunction); + + String callLocation = Utils.getCallLocationName(); + String udfName = "AllWindowedStream." + callLocation; + + String opName; + KeySelector<T, Byte> keySel = input.getKeySelector(); + + OneInputStreamOperator<T, R> operator; + + if (evictor != null) { + @SuppressWarnings({"unchecked", "rawtypes"}) + TypeSerializer<StreamRecord<T>> streamRecordSerializer = + (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig())); + + ListStateDescriptor<StreamRecord<T>> stateDesc = + new ListStateDescriptor<>("window-contents", streamRecordSerializer); + + opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")"; + + operator = + new EvictingWindowOperator<>(windowAssigner, + windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), + keySel, + input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), + stateDesc, + new InternalIterableProcessAllWindowFunction<>(new ReduceApplyProcessAllWindowFunction<>(reduceFunction, function)), + trigger, + evictor, + allowedLateness); + + } else { + ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents", + reduceFunction, + input.getType().createSerializer(getExecutionEnvironment().getConfig())); + + opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")"; + + operator = + new WindowOperator<>(windowAssigner, + windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), + keySel, + input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), + stateDesc, + new InternalSingleValueProcessAllWindowFunction<>(function), + trigger, + allowedLateness); + } + + return input.transform(opName, resultType, operator).forceNonParallel(); + } + // ------------------------------------------------------------------------ // AggregateFunction // ------------------------------------------------------------------------ @@ -483,6 +586,137 @@ public class AllWindowedStream<T, W extends Window> { return input.transform(opName, resultType, operator).forceNonParallel(); } + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * <p>Arriving data is incrementally aggregated using the given aggregate function. This means + * that the window function typically has only a single value to process when called. + * + * @param aggFunction The aggregate function that is used for incremental aggregation. + * @param windowFunction The process window function. + * + * @return The data stream that is the result of applying the window function to the window. + * + * @param <ACC> The type of the AggregateFunction's accumulator + * @param <V> The type of AggregateFunction's result, and the WindowFunction's input + * @param <R> The type of the elements in the resulting stream, equal to the + * WindowFunction's result type + */ + @PublicEvolving + public <ACC, V, R> SingleOutputStreamOperator<R> aggregate( + AggregateFunction<T, ACC, V> aggFunction, + ProcessAllWindowFunction<V, R, W> windowFunction) { + + checkNotNull(aggFunction, "aggFunction"); + checkNotNull(windowFunction, "windowFunction"); + + TypeInformation<ACC> accumulatorType = TypeExtractor.getAggregateFunctionAccumulatorType( + aggFunction, input.getType(), null, false); + + TypeInformation<V> aggResultType = TypeExtractor.getAggregateFunctionReturnType( + aggFunction, input.getType(), null, false); + + TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( + windowFunction, ProcessAllWindowFunction.class, true, true, aggResultType, null, false); + + return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType); + } + + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * <p>Arriving data is incrementally aggregated using the given aggregate function. This means + * that the window function typically has only a single value to process when called. + * + * @param aggregateFunction The aggregation function that is used for incremental aggregation. + * @param windowFunction The process window function. + * @param accumulatorType Type information for the internal accumulator type of the aggregation function + * @param resultType Type information for the result type of the window function + * + * @return The data stream that is the result of applying the window function to the window. + * + * @param <ACC> The type of the AggregateFunction's accumulator + * @param <V> The type of AggregateFunction's result, and the WindowFunction's input + * @param <R> The type of the elements in the resulting stream, equal to the + * WindowFunction's result type + */ + @PublicEvolving + public <ACC, V, R> SingleOutputStreamOperator<R> aggregate( + AggregateFunction<T, ACC, V> aggregateFunction, + ProcessAllWindowFunction<V, R, W> windowFunction, + TypeInformation<ACC> accumulatorType, + TypeInformation<V> aggregateResultType, + TypeInformation<R> resultType) { + + checkNotNull(aggregateFunction, "aggregateFunction"); + checkNotNull(windowFunction, "windowFunction"); + checkNotNull(accumulatorType, "accumulatorType"); + checkNotNull(aggregateResultType, "aggregateResultType"); + checkNotNull(resultType, "resultType"); + + if (aggregateFunction instanceof RichFunction) { + throw new UnsupportedOperationException("This aggregate function cannot be a RichFunction."); + } + + //clean the closures + windowFunction = input.getExecutionEnvironment().clean(windowFunction); + aggregateFunction = input.getExecutionEnvironment().clean(aggregateFunction); + + final String callLocation = Utils.getCallLocationName(); + final String udfName = "AllWindowedStream." + callLocation; + + final String opName; + final KeySelector<T, Byte> keySel = input.getKeySelector(); + + OneInputStreamOperator<T, R> operator; + + if (evictor != null) { + @SuppressWarnings({"unchecked", "rawtypes"}) + TypeSerializer<StreamRecord<T>> streamRecordSerializer = + (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer( + input.getType().createSerializer(getExecutionEnvironment().getConfig())); + + ListStateDescriptor<StreamRecord<T>> stateDesc = + new ListStateDescriptor<>("window-contents", streamRecordSerializer); + + opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")"; + + operator = new EvictingWindowOperator<>(windowAssigner, + windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), + keySel, + input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), + stateDesc, + new InternalAggregateProcessAllWindowFunction<>(aggregateFunction, windowFunction), + trigger, + evictor, + allowedLateness); + + } else { + AggregatingStateDescriptor<T, ACC, V> stateDesc = new AggregatingStateDescriptor<>( + "window-contents", + aggregateFunction, + accumulatorType.createSerializer(getExecutionEnvironment().getConfig())); + + opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")"; + + operator = new WindowOperator<>( + windowAssigner, + windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), + keySel, + input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), + stateDesc, + new InternalSingleValueProcessAllWindowFunction<>(windowFunction), + trigger, + allowedLateness); + } + + return input.transform(opName, resultType, operator).forceNonParallel(); + } + // ------------------------------------------------------------------------ // FoldFunction // ------------------------------------------------------------------------ @@ -630,13 +864,119 @@ public class AllWindowedStream<T, W extends Window> { return input.transform(opName, resultType, operator).forceNonParallel(); } + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * <p> + * Arriving data is incrementally aggregated using the given fold function. + * + * @param initialValue The initial value of the fold. + * @param foldFunction The fold function that is used for incremental aggregation. + * @param function The window function. + * @return The data stream that is the result of applying the window function to the window. + */ + @PublicEvolving + public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue, FoldFunction<T, ACC> foldFunction, ProcessAllWindowFunction<ACC, R, W> function) { + + TypeInformation<ACC> foldAccumulatorType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(), + Utils.getCallLocationName(), true); + + TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( + function, ProcessAllWindowFunction.class, true, true, foldAccumulatorType, null, false); + + return fold(initialValue, foldFunction, function, foldAccumulatorType, resultType); + } + + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * <p> + * Arriving data is incrementally aggregated using the given fold function. + * + * @param initialValue The initial value of the fold. + * @param foldFunction The fold function that is used for incremental aggregation. + * @param function The process window function. + * @param foldAccumulatorType Type information for the result type of the fold function + * @param resultType Type information for the result type of the window function + * @return The data stream that is the result of applying the window function to the window. + */ + @PublicEvolving + public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue, + FoldFunction<T, ACC> foldFunction, + ProcessAllWindowFunction<ACC, R, W> function, + TypeInformation<ACC> foldAccumulatorType, + TypeInformation<R> resultType) { + if (foldFunction instanceof RichFunction) { + throw new UnsupportedOperationException("FoldFunction of fold can not be a RichFunction."); + } + if (windowAssigner instanceof MergingWindowAssigner) { + throw new UnsupportedOperationException("Fold cannot be used with a merging WindowAssigner."); + } + + //clean the closures + function = input.getExecutionEnvironment().clean(function); + foldFunction = input.getExecutionEnvironment().clean(foldFunction); + + String callLocation = Utils.getCallLocationName(); + String udfName = "AllWindowedStream." + callLocation; + + String opName; + KeySelector<T, Byte> keySel = input.getKeySelector(); + + OneInputStreamOperator<T, R> operator; + + if (evictor != null) { + @SuppressWarnings({"unchecked", "rawtypes"}) + TypeSerializer<StreamRecord<T>> streamRecordSerializer = + (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig())); + + ListStateDescriptor<StreamRecord<T>> stateDesc = + new ListStateDescriptor<>("window-contents", streamRecordSerializer); + + opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")"; + + operator = + new EvictingWindowOperator<>(windowAssigner, + windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), + keySel, + input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), + stateDesc, + new InternalIterableProcessAllWindowFunction<>(new FoldApplyProcessAllWindowFunction<>(initialValue, foldFunction, function, foldAccumulatorType)), + trigger, + evictor, + allowedLateness); + + } else { + FoldingStateDescriptor<T, ACC> stateDesc = new FoldingStateDescriptor<>("window-contents", + initialValue, foldFunction, foldAccumulatorType.createSerializer(getExecutionEnvironment().getConfig())); + + opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")"; + + operator = + new WindowOperator<>(windowAssigner, + windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), + keySel, + input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), + stateDesc, + new InternalSingleValueProcessAllWindowFunction<>(function), + trigger, + allowedLateness); + } + + return input.transform(opName, resultType, operator).forceNonParallel(); + } + // ------------------------------------------------------------------------ // Apply (Window Function) // ------------------------------------------------------------------------ /** * Applies the given window function to each window. The window function is called for each - * evaluation of the window for each key individually. The output of the window function is + * evaluation of the window. The output of the window function is * interpreted as a regular non-windowed stream. * * <p> @@ -647,15 +987,16 @@ public class AllWindowedStream<T, W extends Window> { * @return The data stream that is the result of applying the window function to the window. */ public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R, W> function) { + String callLocation = Utils.getCallLocationName(); + function = input.getExecutionEnvironment().clean(function); TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( function, AllWindowFunction.class, true, true, getInputType(), null, false); - - return apply(function, resultType); + return apply(new InternalIterableAllWindowFunction<>(function), resultType, callLocation); } /** * Applies the given window function to each window. The window function is called for each - * evaluation of the window for each key individually. The output of the window function is + * evaluation of the window. The output of the window function is * interpreted as a regular non-windowed stream. * * <p> @@ -663,15 +1004,56 @@ public class AllWindowedStream<T, W extends Window> { * is evaluated, as the function provides no means of incremental aggregation. * * @param function The window function. - * @param resultType Type information for the result type of the window function * @return The data stream that is the result of applying the window function to the window. */ public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) { + String callLocation = Utils.getCallLocationName(); + function = input.getExecutionEnvironment().clean(function); + return apply(new InternalIterableAllWindowFunction<>(function), resultType, callLocation); + } - //clean the closure + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * <p> + * Not that this function requires that all data in the windows is buffered until the window + * is evaluated, as the function provides no means of incremental aggregation. + * + * @param function The process window function. + * @return The data stream that is the result of applying the window function to the window. + */ + @PublicEvolving + public <R> SingleOutputStreamOperator<R> process(ProcessAllWindowFunction<T, R, W> function) { + String callLocation = Utils.getCallLocationName(); function = input.getExecutionEnvironment().clean(function); + TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( + function, ProcessAllWindowFunction.class, true, true, getInputType(), null, false); + return apply(new InternalIterableProcessAllWindowFunction<>(function), resultType, callLocation); + } + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * <p> + * Not that this function requires that all data in the windows is buffered until the window + * is evaluated, as the function provides no means of incremental aggregation. + * + * @param function The process window function. + * @return The data stream that is the result of applying the window function to the window. + */ + @PublicEvolving + public <R> SingleOutputStreamOperator<R> process(ProcessAllWindowFunction<T, R, W> function, TypeInformation<R> resultType) { String callLocation = Utils.getCallLocationName(); + function = input.getExecutionEnvironment().clean(function); + return apply(new InternalIterableProcessAllWindowFunction<>(function), resultType, callLocation); + } + + private <R> SingleOutputStreamOperator<R> apply(InternalWindowFunction<Iterable<T>, R, Byte, W> function, TypeInformation<R> resultType, String callLocation) { + String udfName = "AllWindowedStream." + callLocation; String opName; @@ -695,7 +1077,7 @@ public class AllWindowedStream<T, W extends Window> { keySel, input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), stateDesc, - new InternalIterableAllWindowFunction<>(function), + function, trigger, evictor, allowedLateness); @@ -712,7 +1094,7 @@ public class AllWindowedStream<T, W extends Window> { keySel, input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), stateDesc, - new InternalIterableAllWindowFunction<>(function), + function, trigger, allowedLateness); } http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java new file mode 100644 index 0000000..5ac6766 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.windowing; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FoldFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.util.Collector; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Collections; + +@Internal +public class FoldApplyProcessAllWindowFunction<W extends Window, T, ACC, R> + extends RichProcessAllWindowFunction<T, R, W> + implements OutputTypeConfigurable<R> { + + private static final long serialVersionUID = 1L; + + private final FoldFunction<T, ACC> foldFunction; + private final ProcessAllWindowFunction<ACC, R, W> windowFunction; + + private byte[] serializedInitialValue; + private TypeSerializer<ACC> accSerializer; + private final TypeInformation<ACC> accTypeInformation; + private transient ACC initialValue; + + public FoldApplyProcessAllWindowFunction(ACC initialValue, FoldFunction<T, ACC> foldFunction, ProcessAllWindowFunction<ACC, R, W> windowFunction, TypeInformation<ACC> accTypeInformation) { + this.windowFunction = windowFunction; + this.foldFunction = foldFunction; + this.initialValue = initialValue; + this.accTypeInformation = accTypeInformation; + } + + @Override + public void open(Configuration configuration) throws Exception { + FunctionUtils.openFunction(this.windowFunction, configuration); + + if (serializedInitialValue == null) { + throw new RuntimeException("No initial value was serialized for the fold " + + "window function. Probably the setOutputType method was not called."); + } + + ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue); + DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais); + initialValue = accSerializer.deserialize(in); + } + + @Override + public void close() throws Exception { + FunctionUtils.closeFunction(this.windowFunction); + } + + @Override + public void setRuntimeContext(RuntimeContext t) { + super.setRuntimeContext(t); + + FunctionUtils.setFunctionRuntimeContext(this.windowFunction, t); + } + + @Override + public void process(final Context context, Iterable<T> values, Collector<R> out) throws Exception { + ACC result = accSerializer.copy(initialValue); + + for (T val : values) { + result = foldFunction.fold(result, val); + } + + windowFunction.process(windowFunction.new Context() { + @Override + public W window() { + return context.window(); + } + }, Collections.singletonList(result), out); + } + + @Override + public void setOutputType(TypeInformation<R> outTypeInfo, ExecutionConfig executionConfig) { + accSerializer = accTypeInformation.createSerializer(executionConfig); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos); + + try { + accSerializer.serialize(initialValue, out); + } catch (IOException ioe) { + throw new RuntimeException("Unable to serialize initial value of type " + + initialValue.getClass().getSimpleName() + " of fold window function.", ioe); + } + + serializedInitialValue = baos.toByteArray(); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java new file mode 100644 index 0000000..622e020 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.windowing; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.util.Collector; + +/** + * Base abstract class for functions that are evaluated over non-keyed windows using a context + * for retrieving extra information. + * + * @param <IN> The type of the input value. + * @param <OUT> The type of the output value. + * @param <W> The type of {@code Window} that this window function can be applied on. + */ +@PublicEvolving +public abstract class ProcessAllWindowFunction<IN, OUT, W extends Window> implements Function { + + private static final long serialVersionUID = 1L; + + /** + * Evaluates the window and outputs none or several elements. + * + * @param context The context in which the window is being evaluated. + * @param elements The elements in the window being evaluated. + * @param out A collector for emitting elements. + * + * @throws Exception The function may throw exceptions to fail the program and trigger recovery. + */ + public abstract void process(Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception; + + /** + * The context holding window metadata + */ + public abstract class Context { + /** + * @return The window that is being evaluated. + */ + public abstract W window(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java new file mode 100644 index 0000000..142c71e --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.windowing; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.util.Collector; + +import java.util.Collections; + +@Internal +public class ReduceApplyProcessAllWindowFunction<W extends Window, T, R> + extends RichProcessAllWindowFunction<T, R, W> { + + private static final long serialVersionUID = 1L; + + private final ReduceFunction<T> reduceFunction; + private final ProcessAllWindowFunction<T, R, W> windowFunction; + + public ReduceApplyProcessAllWindowFunction(ReduceFunction<T> reduceFunction, ProcessAllWindowFunction<T, R, W> windowFunction) { + this.windowFunction = windowFunction; + this.reduceFunction = reduceFunction; + } + + @Override + public void process(final Context context, Iterable<T> input, Collector<R> out) throws Exception { + + T curr = null; + for (T val: input) { + if (curr == null) { + curr = val; + } else { + curr = reduceFunction.reduce(curr, val); + } + } + windowFunction.process(windowFunction.new Context() { + @Override + public W window() { + return context.window(); + } + }, Collections.singletonList(curr), out); + } + + @Override + public void open(Configuration configuration) throws Exception { + FunctionUtils.openFunction(this.windowFunction, configuration); + } + + @Override + public void close() throws Exception { + FunctionUtils.closeFunction(this.windowFunction); + } + + @Override + public void setRuntimeContext(RuntimeContext t) { + super.setRuntimeContext(t); + + FunctionUtils.setFunctionRuntimeContext(this.windowFunction, t); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessAllWindowFunction.java new file mode 100644 index 0000000..1130fa5 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessAllWindowFunction.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.windowing; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.IterationRuntimeContext; +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.windowing.windows.Window; + +/** + * Base rich abstract class for functions that are evaluated over keyed (grouped) windows using a context + * for passing extra information. + * + * @param <IN> The type of the input value. + * @param <OUT> The type of the output value. + * @param <W> The type of {@code Window} that this window function can be applied on. + */ +@PublicEvolving +public abstract class RichProcessAllWindowFunction<IN, OUT, W extends Window> + extends ProcessAllWindowFunction<IN, OUT, W> + implements RichFunction { + + private static final long serialVersionUID = 1L; + + + // -------------------------------------------------------------------------------------------- + // Runtime context access + // -------------------------------------------------------------------------------------------- + + private transient RuntimeContext runtimeContext; + + @Override + public void setRuntimeContext(RuntimeContext t) { + this.runtimeContext = t; + } + + @Override + public RuntimeContext getRuntimeContext() { + if (this.runtimeContext != null) { + return this.runtimeContext; + } else { + throw new IllegalStateException("The runtime context has not been initialized."); + } + } + + @Override + public IterationRuntimeContext getIterationRuntimeContext() { + if (this.runtimeContext == null) { + throw new IllegalStateException("The runtime context has not been initialized."); + } else if (this.runtimeContext instanceof IterationRuntimeContext) { + return (IterationRuntimeContext) this.runtimeContext; + } else { + throw new IllegalStateException("This stub is not part of an iteration step function."); + } + } + + // -------------------------------------------------------------------------------------------- + // Default life cycle methods + // -------------------------------------------------------------------------------------------- + + @Override + public void open(Configuration parameters) throws Exception {} + + @Override + public void close() throws Exception {} +} http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java new file mode 100644 index 0000000..9533c95 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing.functions; + +import org.apache.flink.api.common.functions.AggregateFunction; +import org.apache.flink.api.common.functions.IterationRuntimeContext; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.util.Collector; + +import java.util.Collections; + +/** + * Internal window function for wrapping a {@link ProcessAllWindowFunction} that takes an + * {@code Iterable} and an {@link AggregateFunction}. + * + * @param <W> The window type + * @param <T> The type of the input to the AggregateFunction + * @param <ACC> The type of the AggregateFunction's accumulator + * @param <V> The type of the AggregateFunction's result, and the input to the WindowFunction + * @param <R> The result type of the WindowFunction + */ +public final class InternalAggregateProcessAllWindowFunction<T, ACC, V, R, W extends Window> + extends WrappingFunction<ProcessAllWindowFunction<V, R, W>> + implements InternalWindowFunction<Iterable<T>, R, Byte, W> { + + private static final long serialVersionUID = 1L; + + private final AggregateFunction<T, ACC, V> aggFunction; + + public InternalAggregateProcessAllWindowFunction( + AggregateFunction<T, ACC, V> aggFunction, + ProcessAllWindowFunction<V, R, W> windowFunction) { + super(windowFunction); + this.aggFunction = aggFunction; + } + + @Override + public void apply(Byte key, final W window, Iterable<T> input, Collector<R> out) throws Exception { + ProcessAllWindowFunction<V, R, W> wrappedFunction = this.wrappedFunction; + ProcessAllWindowFunction<V, R, W>.Context context = wrappedFunction.new Context() { + @Override + public W window() { + return window; + } + }; + + final ACC acc = aggFunction.createAccumulator(); + + for (T val : input) { + aggFunction.add(val, acc); + } + + wrappedFunction.process(context, Collections.singletonList(aggFunction.getResult(acc)), out); + } + + @Override + public RuntimeContext getRuntimeContext() { + throw new RuntimeException("This should never be called."); + } + + @Override + public IterationRuntimeContext getIterationRuntimeContext() { + throw new RuntimeException("This should never be called."); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessAllWindowFunction.java new file mode 100644 index 0000000..e33cc2a --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessAllWindowFunction.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing.functions; + +import org.apache.flink.api.common.functions.IterationRuntimeContext; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.util.Collector; + +/** + * Internal window function for wrapping a {@link ProcessAllWindowFunction} that takes an {@code Iterable} + * when the window state also is an {@code Iterable}. + */ +public final class InternalIterableProcessAllWindowFunction<IN, OUT, W extends Window> + extends WrappingFunction<ProcessAllWindowFunction<IN, OUT, W>> + implements InternalWindowFunction<Iterable<IN>, OUT, Byte, W> { + + private static final long serialVersionUID = 1L; + + public InternalIterableProcessAllWindowFunction(ProcessAllWindowFunction<IN, OUT, W> wrappedFunction) { + super(wrappedFunction); + } + + @Override + public void apply(Byte key, final W window, Iterable<IN> input, Collector<OUT> out) throws Exception { + ProcessAllWindowFunction<IN, OUT, W> wrappedFunction = this.wrappedFunction; + ProcessAllWindowFunction<IN, OUT, W>.Context context = wrappedFunction.new Context() { + @Override + public W window() { + return window; + } + }; + + wrappedFunction.process(context, input, out); + } + + @Override + public RuntimeContext getRuntimeContext() { + throw new RuntimeException("This should never be called."); + } + + @Override + public IterationRuntimeContext getIterationRuntimeContext() { + throw new RuntimeException("This should never be called."); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessAllWindowFunction.java new file mode 100644 index 0000000..0284ef7 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessAllWindowFunction.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing.functions; + +import org.apache.flink.api.common.functions.IterationRuntimeContext; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.util.Collector; + +import java.util.Collections; + +/** + * Internal window function for wrapping a {@link ProcessAllWindowFunction} that takes an {@code Iterable} + * when the window state is a single value. + */ +public final class InternalSingleValueProcessAllWindowFunction<IN, OUT, W extends Window> + extends WrappingFunction<ProcessAllWindowFunction<IN, OUT, W>> + implements InternalWindowFunction<IN, OUT, Byte, W> { + + private static final long serialVersionUID = 1L; + + public InternalSingleValueProcessAllWindowFunction(ProcessAllWindowFunction<IN, OUT, W> wrappedFunction) { + super(wrappedFunction); + } + + @Override + public void apply(Byte key, final W window, IN input, Collector<OUT> out) throws Exception { + ProcessAllWindowFunction<IN, OUT, W> wrappedFunction = this.wrappedFunction; + ProcessAllWindowFunction<IN, OUT, W>.Context context = wrappedFunction.new Context() { + @Override + public W window() { + return window; + } + }; + + wrappedFunction.process(context, Collections.singletonList(input), out); + } + + @Override + public RuntimeContext getRuntimeContext() { + throw new RuntimeException("This should never be called."); + } + + @Override + public IterationRuntimeContext getIterationRuntimeContext() { + throw new RuntimeException("This should never be called."); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java index b28c208..7a4e8c6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java @@ -21,14 +21,13 @@ import org.apache.flink.api.common.functions.IterationRuntimeContext; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.java.operators.translation.WrappingFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; -import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; import java.util.Collections; /** - * Internal window function for wrapping a {@link WindowFunction} that takes an {@code Iterable} + * Internal window function for wrapping a {@link ProcessWindowFunction} that takes an {@code Iterable} * when the window state is a single value. */ public final class InternalSingleValueProcessWindowFunction<IN, OUT, KEY, W extends Window> http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java index af5c77a..734879d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java @@ -22,14 +22,17 @@ import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.util.ListCollector; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.ByteSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.functions.windowing.FoldApplyProcessAllWindowFunction; import org.apache.flink.streaming.api.functions.windowing.FoldApplyProcessWindowFunction; import org.apache.flink.streaming.api.functions.windowing.FoldApplyWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.graph.StreamGraph; @@ -39,6 +42,7 @@ import org.apache.flink.streaming.api.transformations.SourceTransformation; import org.apache.flink.streaming.api.transformations.StreamTransformation; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessAllWindowFunction; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction; import org.apache.flink.util.Collector; import org.junit.Test; @@ -145,6 +149,101 @@ public class FoldApplyProcessWindowFunctionTest { Assert.assertEquals(expected, result); } + /** + * Tests that the FoldWindowFunction gets the output type serializer set by the + * StreamGraphGenerator and checks that the FoldWindowFunction computes the correct result. + */ + @Test + public void testFoldAllWindowFunctionOutputTypeConfigurable() throws Exception{ + StreamExecutionEnvironment env = new DummyStreamExecutionEnvironment(); + + List<StreamTransformation<?>> transformations = new ArrayList<>(); + + int initValue = 1; + + FoldApplyProcessAllWindowFunction<TimeWindow, Integer, Integer, Integer> foldWindowFunction = new FoldApplyProcessAllWindowFunction<>( + initValue, + new FoldFunction<Integer, Integer>() { + @Override + public Integer fold(Integer accumulator, Integer value) throws Exception { + return accumulator + value; + } + + }, + new ProcessAllWindowFunction<Integer, Integer, TimeWindow>() { + @Override + public void process(Context context, + Iterable<Integer> input, + Collector<Integer> out) throws Exception { + for (Integer in: input) { + out.collect(in); + } + } + }, + BasicTypeInfo.INT_TYPE_INFO + ); + + AccumulatingProcessingTimeWindowOperator<Byte, Integer, Integer> windowOperator = new AccumulatingProcessingTimeWindowOperator<>( + new InternalIterableProcessAllWindowFunction<>(foldWindowFunction), + new KeySelector<Integer, Byte>() { + private static final long serialVersionUID = -7951310554369722809L; + + @Override + public Byte getKey(Integer value) throws Exception { + return 0; + } + }, + ByteSerializer.INSTANCE, + IntSerializer.INSTANCE, + 3000, + 3000 + ); + + SourceFunction<Integer> sourceFunction = new SourceFunction<Integer>(){ + + private static final long serialVersionUID = 8297735565464653028L; + + @Override + public void run(SourceContext<Integer> ctx) throws Exception { + + } + + @Override + public void cancel() { + + } + }; + + SourceTransformation<Integer> source = new SourceTransformation<>("", new StreamSource<>(sourceFunction), BasicTypeInfo.INT_TYPE_INFO, 1); + + transformations.add(new OneInputTransformation<>(source, "test", windowOperator, BasicTypeInfo.INT_TYPE_INFO, 1)); + + StreamGraph streamGraph = StreamGraphGenerator.generate(env, transformations); + + List<Integer> result = new ArrayList<>(); + List<Integer> input = new ArrayList<>(); + List<Integer> expected = new ArrayList<>(); + + input.add(1); + input.add(2); + input.add(3); + + for (int value : input) { + initValue += value; + } + + expected.add(initValue); + + foldWindowFunction.process(foldWindowFunction.new Context() { + @Override + public TimeWindow window() { + return new TimeWindow(0, 1); + } + }, input, new ListCollector<>(result)); + + Assert.assertEquals(expected, result); + } + public static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment { @Override http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java index 813ca96..f306231 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java @@ -34,6 +34,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.transformations.OneInputTransformation; @@ -162,6 +163,24 @@ public class StateDescriptorPassingTest { } @Test + public void testProcessAllWindowState() throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class); + + DataStream<File> src = env.fromElements(new File("/")); + + SingleOutputStreamOperator<?> result = src + .timeWindowAll(Time.milliseconds(1000)) + .process(new ProcessAllWindowFunction<File, String, TimeWindow>() { + @Override + public void process(Context ctx, Iterable<File> input, Collector<String> out) {} + }); + + validateListStateDescriptorConfigured(result); + } + + @Test public void testFoldWindowAllState() throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java index e49a496..8f795e9 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java @@ -26,15 +26,19 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils; import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.RichProcessAllWindowFunction; import org.apache.flink.streaming.api.functions.windowing.RichProcessWindowFunction; import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction; import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalAggregateProcessAllWindowFunction; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalAggregateProcessWindowFunction; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessAllWindowFunction; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessAllWindowFunction; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessWindowFunction; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction; import org.apache.flink.util.Collector; @@ -99,6 +103,47 @@ public class InternalWindowFunctionTest { @SuppressWarnings("unchecked") @Test + public void testInternalIterableProcessAllWindowFunction() throws Exception { + + ProcessAllWindowFunctionMock mock = mock(ProcessAllWindowFunctionMock.class); + InternalIterableProcessAllWindowFunction<Long, String, TimeWindow> windowFunction = + new InternalIterableProcessAllWindowFunction<>(mock); + + // check setOutputType + TypeInformation<String> stringType = BasicTypeInfo.STRING_TYPE_INFO; + ExecutionConfig execConf = new ExecutionConfig(); + execConf.setParallelism(42); + + StreamingFunctionUtils.setOutputType(windowFunction, stringType, execConf); + verify(mock).setOutputType(stringType, execConf); + + // check open + Configuration config = new Configuration(); + + windowFunction.open(config); + verify(mock).open(config); + + // check setRuntimeContext + RuntimeContext rCtx = mock(RuntimeContext.class); + + windowFunction.setRuntimeContext(rCtx); + verify(mock).setRuntimeContext(rCtx); + + // check apply + TimeWindow w = mock(TimeWindow.class); + Iterable<Long> i = (Iterable<Long>)mock(Iterable.class); + Collector<String> c = (Collector<String>) mock(Collector.class); + + windowFunction.apply(((byte)0), w, i, c); + verify(mock).process((ProcessAllWindowFunctionMock.Context) anyObject(), eq(i), eq(c)); + + // check close + windowFunction.close(); + verify(mock).close(); + } + + @SuppressWarnings("unchecked") + @Test public void testInternalIterableWindowFunction() throws Exception { WindowFunctionMock mock = mock(WindowFunctionMock.class); @@ -263,6 +308,47 @@ public class InternalWindowFunctionTest { @SuppressWarnings("unchecked") @Test + public void testInternalSingleValueProcessAllWindowFunction() throws Exception { + + ProcessAllWindowFunctionMock mock = mock(ProcessAllWindowFunctionMock.class); + InternalSingleValueProcessAllWindowFunction<Long, String, TimeWindow> windowFunction = + new InternalSingleValueProcessAllWindowFunction<>(mock); + + // check setOutputType + TypeInformation<String> stringType = BasicTypeInfo.STRING_TYPE_INFO; + ExecutionConfig execConf = new ExecutionConfig(); + execConf.setParallelism(42); + + StreamingFunctionUtils.setOutputType(windowFunction, stringType, execConf); + + verify(mock).setOutputType(stringType, execConf); + + // check open + Configuration config = new Configuration(); + + windowFunction.open(config); + verify(mock).open(config); + + // check setRuntimeContext + RuntimeContext rCtx = mock(RuntimeContext.class); + + windowFunction.setRuntimeContext(rCtx); + verify(mock).setRuntimeContext(rCtx); + + // check apply + TimeWindow w = mock(TimeWindow.class); + Collector<String> c = (Collector<String>) mock(Collector.class); + + windowFunction.apply(((byte)0), w, 23L, c); + verify(mock).process((ProcessAllWindowFunctionMock.Context) anyObject(), (Iterable<Long>)argThat(IsIterableContainingInOrder.contains(23L)), eq(c)); + + // check close + windowFunction.close(); + verify(mock).close(); + } + + @SuppressWarnings("unchecked") + @Test public void testInternalSingleValueProcessWindowFunction() throws Exception { ProcessWindowFunctionMock mock = mock(ProcessWindowFunctionMock.class); @@ -310,7 +396,7 @@ public class InternalWindowFunctionTest { InternalAggregateProcessWindowFunction<Long, Set<Long>, Map<Long, Long>, String, Long, TimeWindow> windowFunction = new InternalAggregateProcessWindowFunction<>(new AggregateFunction<Long, Set<Long>, Map<Long, Long>>() { private static final long serialVersionUID = 1L; - + @Override public Set<Long> createAccumulator() { return new HashSet<>(); @@ -364,7 +450,7 @@ public class InternalWindowFunctionTest { List<Long> args = new LinkedList<>(); args.add(23L); args.add(24L); - + windowFunction.apply(42L, w, args, c); verify(mock).process( eq(42L), @@ -379,6 +465,83 @@ public class InternalWindowFunctionTest { verify(mock).close(); } + @SuppressWarnings("unchecked") + @Test + public void testInternalAggregateProcessAllWindowFunction() throws Exception { + + AggregateProcessAllWindowFunctionMock mock = mock(AggregateProcessAllWindowFunctionMock.class); + + InternalAggregateProcessAllWindowFunction<Long, Set<Long>, Map<Long, Long>, String, TimeWindow> windowFunction = + new InternalAggregateProcessAllWindowFunction<>(new AggregateFunction<Long, Set<Long>, Map<Long, Long>>() { + private static final long serialVersionUID = 1L; + + @Override + public Set<Long> createAccumulator() { + return new HashSet<>(); + } + + @Override + public void add(Long value, Set<Long> accumulator) { + accumulator.add(value); + } + + @Override + public Map<Long, Long> getResult(Set<Long> accumulator) { + Map<Long, Long> result = new HashMap<>(); + for (Long in : accumulator) { + result.put(in, in); + } + return result; + } + + @Override + public Set<Long> merge(Set<Long> a, Set<Long> b) { + a.addAll(b); + return a; + } + }, mock); + + // check setOutputType + TypeInformation<String> stringType = BasicTypeInfo.STRING_TYPE_INFO; + ExecutionConfig execConf = new ExecutionConfig(); + execConf.setParallelism(42); + + StreamingFunctionUtils.setOutputType(windowFunction, stringType, execConf); + verify(mock).setOutputType(stringType, execConf); + + // check open + Configuration config = new Configuration(); + + windowFunction.open(config); + verify(mock).open(config); + + // check setRuntimeContext + RuntimeContext rCtx = mock(RuntimeContext.class); + + windowFunction.setRuntimeContext(rCtx); + verify(mock).setRuntimeContext(rCtx); + + // check apply + TimeWindow w = mock(TimeWindow.class); + Collector<String> c = (Collector<String>) mock(Collector.class); + + List<Long> args = new LinkedList<>(); + args.add(23L); + args.add(24L); + + windowFunction.apply(((byte)0), w, args, c); + verify(mock).process( + (AggregateProcessAllWindowFunctionMock.Context) anyObject(), + (Iterable) argThat(containsInAnyOrder(allOf( + hasEntry(is(23L), is(23L)), + hasEntry(is(24L), is(24L))))), + eq(c)); + + // check close + windowFunction.close(); + verify(mock).close(); + } + public static class ProcessWindowFunctionMock extends RichProcessWindowFunction<Long, String, Long, TimeWindow> implements OutputTypeConfigurable<String> { @@ -405,6 +568,19 @@ public class InternalWindowFunctionTest { public void process(Long aLong, Context context, Iterable<Map<Long, Long>> input, Collector<String> out) throws Exception { } } + public static class AggregateProcessAllWindowFunctionMock + extends RichProcessAllWindowFunction<Map<Long, Long>, String, TimeWindow> + implements OutputTypeConfigurable<String> { + + private static final long serialVersionUID = 1L; + + @Override + public void setOutputType(TypeInformation<String> outTypeInfo, ExecutionConfig executionConfig) { } + + @Override + public void process(Context context, Iterable<Map<Long, Long>> input, Collector<String> out) throws Exception { } + } + public static class WindowFunctionMock extends RichWindowFunction<Long, String, Long, TimeWindow> implements OutputTypeConfigurable<String> { @@ -430,4 +606,17 @@ public class InternalWindowFunctionTest { @Override public void apply(TimeWindow window, Iterable<Long> values, Collector<String> out) throws Exception { } } + + public static class ProcessAllWindowFunctionMock + extends RichProcessAllWindowFunction<Long, String, TimeWindow> + implements OutputTypeConfigurable<String> { + + private static final long serialVersionUID = 1L; + + @Override + public void setOutputType(TypeInformation<String> outTypeInfo, ExecutionConfig executionConfig) { } + + @Override + public void process(Context context, Iterable<Long> input, Collector<String> out) throws Exception { } + } }
