[FLINK-3521] Make Iterable part of method signature for WindowFunction This closes #1723
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ba069f35 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ba069f35 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ba069f35 Branch: refs/heads/release-1.0 Commit: ba069f35b21de5d2a30ba0cd2234f20f35532c09 Parents: 603f351 Author: Aljoscha Krettek <[email protected]> Authored: Fri Feb 26 15:19:50 2016 +0100 Committer: Robert Metzger <[email protected]> Committed: Sat Feb 27 00:10:52 2016 +0100 ---------------------------------------------------------------------- .../ml/IncrementalLearningSkeleton.java | 2 +- .../GroupedProcessingTimeWindowExample.java | 2 +- .../api/datastream/AllWindowedStream.java | 8 +-- .../api/datastream/CoGroupedStreams.java | 2 +- .../api/datastream/WindowedStream.java | 39 +++++------ .../functions/windowing/AllWindowFunction.java | 5 +- .../windowing/FoldApplyAllWindowFunction.java | 5 +- .../windowing/FoldApplyWindowFunction.java | 5 +- .../windowing/PassThroughAllWindowFunction.java | 6 +- .../windowing/PassThroughWindowFunction.java | 6 +- .../windowing/ReduceApplyAllWindowFunction.java | 6 +- .../windowing/ReduceApplyWindowFunction.java | 6 +- .../ReduceIterableAllWindowFunction.java | 2 +- .../windowing/ReduceIterableWindowFunction.java | 2 +- .../api/functions/windowing/WindowFunction.java | 3 +- .../windowing/AccumulatingKeyedTimePanes.java | 8 +-- ...ccumulatingProcessingTimeWindowOperator.java | 6 +- .../EvictingNonKeyedWindowOperator.java | 2 +- .../windowing/EvictingWindowOperator.java | 6 +- .../windowing/NonKeyedWindowOperator.java | 4 +- .../operators/windowing/WindowOperator.java | 12 ++-- .../InternalIterableWindowFunction.java | 72 +++++++++++++++++++ .../InternalSingleValueWindowFunction.java | 74 ++++++++++++++++++++ .../functions/InternalWindowFunction.java | 47 +++++++++++++ .../flink/streaming/api/DataStreamTest.java | 2 +- .../operators/FoldApplyWindowFunctionTest.java | 6 +- ...AlignedProcessingTimeWindowOperatorTest.java | 10 +-- .../windowing/AllWindowTranslationTest.java | 6 +- .../windowing/EvictingWindowOperatorTest.java | 7 +- .../windowing/TimeWindowTranslationTest.java | 6 +- .../operators/windowing/WindowOperatorTest.java | 16 +++-- .../windowing/WindowTranslationTest.java | 7 +- .../streaming/api/scala/AllWindowedStream.scala | 43 ++++++++---- .../streaming/api/scala/WindowedStream.scala | 47 +++++++++---- .../api/scala/function/AllWindowFunction.scala | 45 ++++++++++++ .../api/scala/function/WindowFunction.scala | 47 +++++++++++++ .../api/scala/AllWindowTranslationTest.scala | 12 ++-- .../api/scala/WindowTranslationTest.scala | 12 ++-- .../EventTimeAllWindowCheckpointingITCase.java | 22 ++++-- .../EventTimeWindowCheckpointingITCase.java | 24 +++++-- .../WindowCheckpointingITCase.java | 4 +- 41 files changed, 498 insertions(+), 148 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java index acbc5d6..4108485 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java @@ -167,7 +167,7 @@ public class IncrementalLearningSkeleton { /** * Builds up-to-date partial models on new training data. */ - public static class PartialModelBuilder implements AllWindowFunction<Iterable<Integer>, Double[], TimeWindow> { + public static class PartialModelBuilder implements AllWindowFunction<Integer, Double[], TimeWindow> { private static final long serialVersionUID = 1L; protected Double[] buildPartialModel(Iterable<Integer> values) { http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java index 196b73e..f08069b 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java @@ -104,7 +104,7 @@ public class GroupedProcessingTimeWindowExample { } } - public static class SummingWindowFunction implements WindowFunction<Iterable<Tuple2<Long, Long>>, Tuple2<Long, Long>, Long, Window> { + public static class SummingWindowFunction implements WindowFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Long, Window> { @Override public void apply(Long key, Window window, Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) { http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java index 56640d3..6b32880 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java @@ -26,7 +26,6 @@ import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.Utils; -import org.apache.flink.api.java.typeutils.GenericTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction; @@ -222,11 +221,10 @@ public class AllWindowedStream<T, W extends Window> { * @param function The window function. * @return The data stream that is the result of applying the window function to the window. */ - public <R> SingleOutputStreamOperator<R, ?> apply(AllWindowFunction<Iterable<T>, R, W> function) { + public <R> SingleOutputStreamOperator<R, ?> apply(AllWindowFunction<T, R, W> function) { @SuppressWarnings("unchecked, rawtypes") - TypeInformation<Iterable<T>> iterTypeInfo = new GenericTypeInfo<>((Class) Iterable.class); TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( - function, AllWindowFunction.class, true, true, iterTypeInfo, null, false); + function, AllWindowFunction.class, true, true, getInputType(), null, false); return apply(function, resultType); } @@ -242,7 +240,7 @@ public class AllWindowedStream<T, W extends Window> { * @param function The window function. * @return The data stream that is the result of applying the window function to the window. */ - public <R> SingleOutputStreamOperator<R, ?> apply(AllWindowFunction<Iterable<T>, R, W> function, TypeInformation<R> resultType) { + public <R> SingleOutputStreamOperator<R, ?> apply(AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) { //clean the closure function = input.getExecutionEnvironment().clean(function); http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java index e921940..713433c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java @@ -556,7 +556,7 @@ public class CoGroupedStreams<T1, T2> { private static class CoGroupWindowFunction<T1, T2, T, KEY, W extends Window> extends WrappingFunction<CoGroupFunction<T1, T2, T>> - implements WindowFunction<Iterable<TaggedUnion<T1, T2>>, T, KEY, W> { + implements WindowFunction<TaggedUnion<T1, T2>, T, KEY, W> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java index 2ced99d..5c92fe0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java @@ -30,7 +30,6 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.typeutils.GenericTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction; @@ -52,6 +51,8 @@ import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator; import org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator; import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction; import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; @@ -218,11 +219,9 @@ public class WindowedStream<T, K, W extends Window> { * @param function The window function. * @return The data stream that is the result of applying the window function to the window. */ - public <R> SingleOutputStreamOperator<R, ?> apply(WindowFunction<Iterable<T>, R, K, W> function) { - @SuppressWarnings("unchecked, rawtypes") - TypeInformation<Iterable<T>> iterTypeInfo = new GenericTypeInfo<>((Class) Iterable.class); + public <R> SingleOutputStreamOperator<R, ?> apply(WindowFunction<T, R, K, W> function) { TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( - function, WindowFunction.class, true, true, iterTypeInfo, null, false); + function, WindowFunction.class, true, true, getInputType(), null, false); return apply(function, resultType); } @@ -240,7 +239,7 @@ public class WindowedStream<T, K, W extends Window> { * @param resultType Type information for the result type of the window function * @return The data stream that is the result of applying the window function to the window. */ - public <R> SingleOutputStreamOperator<R, ?> apply(WindowFunction<Iterable<T>, R, K, W> function, TypeInformation<R> resultType) { + public <R> SingleOutputStreamOperator<R, ?> apply(WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) { //clean the closure function = input.getExecutionEnvironment().clean(function); @@ -270,7 +269,7 @@ public class WindowedStream<T, K, W extends Window> { keySel, input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), stateDesc, - function, + new InternalIterableWindowFunction<>(function), trigger, evictor); @@ -285,7 +284,7 @@ public class WindowedStream<T, K, W extends Window> { keySel, input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), stateDesc, - function, + new InternalIterableWindowFunction<>(function), trigger); } @@ -350,13 +349,13 @@ public class WindowedStream<T, K, W extends Window> { opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")"; operator = new EvictingWindowOperator<>(windowAssigner, - windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), - keySel, - input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), - stateDesc, - new ReduceApplyWindowFunction<>(reduceFunction, function), - trigger, - evictor); + windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), + keySel, + input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), + stateDesc, + new InternalIterableWindowFunction<>(new ReduceApplyWindowFunction<>(reduceFunction, function)), + trigger, + evictor); } else { ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents", @@ -370,7 +369,7 @@ public class WindowedStream<T, K, W extends Window> { keySel, input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), stateDesc, - function, + new InternalSingleValueWindowFunction<>(function), trigger); } @@ -441,7 +440,7 @@ public class WindowedStream<T, K, W extends Window> { keySel, input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), stateDesc, - new FoldApplyWindowFunction<>(initialValue, foldFunction, function), + new InternalIterableWindowFunction<>(new FoldApplyWindowFunction<>(initialValue, foldFunction, function)), trigger, evictor); @@ -458,7 +457,7 @@ public class WindowedStream<T, K, W extends Window> { keySel, input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), stateDesc, - function, + new InternalSingleValueWindowFunction<>(function), trigger); } @@ -694,7 +693,7 @@ public class WindowedStream<T, K, W extends Window> { } else if (function instanceof WindowFunction) { @SuppressWarnings("unchecked") - WindowFunction<Iterable<T>, R, K, TimeWindow> wf = (WindowFunction<Iterable<T>, R, K, TimeWindow>) function; + WindowFunction<T, R, K, TimeWindow> wf = (WindowFunction<T, R, K, TimeWindow>) function; OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>( wf, input.getKeySelector(), @@ -726,7 +725,7 @@ public class WindowedStream<T, K, W extends Window> { } else if (function instanceof WindowFunction) { @SuppressWarnings("unchecked") - WindowFunction<Iterable<T>, R, K, TimeWindow> wf = (WindowFunction<Iterable<T>, R, K, TimeWindow>) function; + WindowFunction<T, R, K, TimeWindow> wf = (WindowFunction<T, R, K, TimeWindow>) function; OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>( wf, input.getKeySelector(), http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java index 62e86ca..c497b4a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java @@ -30,9 +30,10 @@ import java.io.Serializable; * * @param <IN> The type of the input value. * @param <OUT> The type of the output value. + * @param <W> The type of {@code Window} that this window function can be applied on. */ @Public -public interface AllWindowFunction<IN, OUT, W extends Window> extends Function, Serializable { +public interface AllWindowFunction<IN, OUT, W extends Window> extends Function, Serializable { /** * Evaluates the window and outputs none or several elements. @@ -43,5 +44,5 @@ public interface AllWindowFunction<IN, OUT, W extends Window> extends Function, * * @throws Exception The function may throw exceptions to fail the program and trigger recovery. */ - void apply(W window, IN values, Collector<OUT> out) throws Exception; + void apply(W window, Iterable<IN> values, Collector<OUT> out) throws Exception; } http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java index 76fd562..a5bc0a1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java @@ -33,11 +33,12 @@ import org.apache.flink.util.Collector; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.Collections; @Internal public class FoldApplyAllWindowFunction<W extends Window, T, ACC> extends WrappingFunction<AllWindowFunction<ACC, ACC, W>> - implements AllWindowFunction<Iterable<T>, ACC, W>, OutputTypeConfigurable<ACC> { + implements AllWindowFunction<T, ACC, W>, OutputTypeConfigurable<ACC> { private static final long serialVersionUID = 1L; @@ -75,7 +76,7 @@ public class FoldApplyAllWindowFunction<W extends Window, T, ACC> result = foldFunction.fold(result, val); } - wrappedFunction.apply(window, result, out); + wrappedFunction.apply(window, Collections.singletonList(result), out); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java index 40e8830..756a683 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java @@ -33,11 +33,12 @@ import org.apache.flink.util.Collector; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.Collections; @Internal public class FoldApplyWindowFunction<K, W extends Window, T, ACC> extends WrappingFunction<WindowFunction<ACC, ACC, K, W>> - implements WindowFunction<Iterable<T>, ACC, K, W>, OutputTypeConfigurable<ACC> { + implements WindowFunction<T, ACC, K, W>, OutputTypeConfigurable<ACC> { private static final long serialVersionUID = 1L; @@ -75,7 +76,7 @@ public class FoldApplyWindowFunction<K, W extends Window, T, ACC> result = foldFunction.fold(result, val); } - wrappedFunction.apply(key, window, result, out); + wrappedFunction.apply(key, window, Collections.singletonList(result), out); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughAllWindowFunction.java index 3e3ffca..4435644 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughAllWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughAllWindowFunction.java @@ -27,7 +27,9 @@ public class PassThroughAllWindowFunction<W extends Window, T> implements AllWin private static final long serialVersionUID = 1L; @Override - public void apply(W window, T input, Collector<T> out) throws Exception { - out.collect(input); + public void apply(W window, Iterable<T> input, Collector<T> out) throws Exception { + for (T in: input) { + out.collect(in); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughWindowFunction.java index 21709b9..319acb6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughWindowFunction.java @@ -27,7 +27,9 @@ public class PassThroughWindowFunction<K, W extends Window, T> implements Window private static final long serialVersionUID = 1L; @Override - public void apply(K k, W window, T input, Collector<T> out) throws Exception { - out.collect(input); + public void apply(K k, W window, Iterable<T> input, Collector<T> out) throws Exception { + for (T in: input) { + out.collect(in); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyAllWindowFunction.java index ce1615b..5b8dd70 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyAllWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyAllWindowFunction.java @@ -23,10 +23,12 @@ import org.apache.flink.api.java.operators.translation.WrappingFunction; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; +import java.util.Collections; + @Internal public class ReduceApplyAllWindowFunction<W extends Window, T, R> extends WrappingFunction<AllWindowFunction<T, R, W>> - implements AllWindowFunction<Iterable<T>, R, W> { + implements AllWindowFunction<T, R, W> { private static final long serialVersionUID = 1L; @@ -51,6 +53,6 @@ public class ReduceApplyAllWindowFunction<W extends Window, T, R> curr = reduceFunction.reduce(curr, val); } } - windowFunction.apply(window, curr, out); + windowFunction.apply(window, Collections.singletonList(curr), out); } } http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyWindowFunction.java index 75ea2d2..f896282 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyWindowFunction.java @@ -23,10 +23,12 @@ import org.apache.flink.api.java.operators.translation.WrappingFunction; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; +import java.util.Collections; + @Internal public class ReduceApplyWindowFunction<K, W extends Window, T, R> extends WrappingFunction<WindowFunction<T, R, K, W>> - implements WindowFunction<Iterable<T>, R, K, W> { + implements WindowFunction<T, R, K, W> { private static final long serialVersionUID = 1L; @@ -51,6 +53,6 @@ public class ReduceApplyWindowFunction<K, W extends Window, T, R> curr = reduceFunction.reduce(curr, val); } } - windowFunction.apply(k, window, curr, out); + windowFunction.apply(k, window, Collections.singletonList(curr), out); } } http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableAllWindowFunction.java index a3b35ae..8ec5809 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableAllWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableAllWindowFunction.java @@ -23,7 +23,7 @@ import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; @Internal -public class ReduceIterableAllWindowFunction<W extends Window, T> implements AllWindowFunction<Iterable<T>, T, W> { +public class ReduceIterableAllWindowFunction<W extends Window, T> implements AllWindowFunction<T, T, W> { private static final long serialVersionUID = 1L; private final ReduceFunction<T> reduceFunction; http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableWindowFunction.java index e296411..afb0219 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableWindowFunction.java @@ -23,7 +23,7 @@ import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; @Internal -public class ReduceIterableWindowFunction<K, W extends Window, T> implements WindowFunction<Iterable<T>, T, K, W> { +public class ReduceIterableWindowFunction<K, W extends Window, T> implements WindowFunction<T, T, K, W> { private static final long serialVersionUID = 1L; private final ReduceFunction<T> reduceFunction; http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java index 83ef18e..154fe88 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java @@ -31,6 +31,7 @@ import java.io.Serializable; * @param <IN> The type of the input value. * @param <OUT> The type of the output value. * @param <KEY> The type of the key. + * @param <W> The type of {@code Window} that this window function can be applied on. */ @Public public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable { @@ -45,5 +46,5 @@ public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function * * @throws Exception The function may throw exceptions to fail the program and trigger recovery. */ - void apply(KEY key, W window, IN input, Collector<OUT> out) throws Exception; + void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception; } http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java index b830789..9b353fe 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java @@ -36,7 +36,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed private final KeyMap.LazyFactory<ArrayList<Type>> listFactory = getListFactory(); - private final WindowFunction<Iterable<Type>, Result, Key, Window> function; + private final WindowFunction<Type, Result, Key, Window> function; /** * IMPORTANT: This value needs to start at one, so it is fresher than the value that new entries have (zero) */ @@ -44,7 +44,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed // ------------------------------------------------------------------------ - public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, WindowFunction<Iterable<Type>, Result, Key, Window> function) { + public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, WindowFunction<Type, Result, Key, Window> function) { this.keySelector = keySelector; this.function = function; } @@ -86,7 +86,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed static final class WindowFunctionTraversal<Key, Type, Result> implements KeyMap.TraversalEvaluator<Key, ArrayList<Type>> { - private final WindowFunction<Iterable<Type>, Result, Key, Window> function; + private final WindowFunction<Type, Result, Key, Window> function; private final UnionIterator<Type> unionIterator; @@ -99,7 +99,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed private Key currentKey; - WindowFunctionTraversal(WindowFunction<Iterable<Type>, Result, Key, Window> function, TimeWindow window, + WindowFunctionTraversal(WindowFunction<Type, Result, Key, Window> function, TimeWindow window, Collector<Result> out, AbstractStreamOperator<Result> contextOperator) { this.function = function; this.out = out; http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java index 2f0d4fe..9ea2949 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java @@ -33,13 +33,13 @@ import java.util.ArrayList; @Internal public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT> - extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, ArrayList<IN>, WindowFunction<Iterable<IN>, OUT, KEY, TimeWindow>> { + extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, ArrayList<IN>, WindowFunction<IN, OUT, KEY, TimeWindow>> { private static final long serialVersionUID = 7305948082830843475L; public AccumulatingProcessingTimeWindowOperator( - WindowFunction<Iterable<IN>, OUT, KEY, TimeWindow> function, + WindowFunction<IN, OUT, KEY, TimeWindow> function, KeySelector<IN, KEY> keySelector, TypeSerializer<KEY> keySerializer, TypeSerializer<IN> valueSerializer, @@ -53,7 +53,7 @@ public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT> @Override protected AccumulatingKeyedTimePanes<IN, KEY, OUT> createPanes(KeySelector<IN, KEY> keySelector, Function function) { @SuppressWarnings("unchecked") - WindowFunction<Iterable<IN>, OUT, KEY, Window> windowFunction = (WindowFunction<Iterable<IN>, OUT, KEY, Window>) function; + WindowFunction<IN, OUT, KEY, Window> windowFunction = (WindowFunction<IN, OUT, KEY, Window>) function; return new AccumulatingKeyedTimePanes<>(keySelector, windowFunction); } http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java index 510ebb2..221367d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java @@ -49,7 +49,7 @@ public class EvictingNonKeyedWindowOperator<IN, OUT, W extends Window> extends N public EvictingNonKeyedWindowOperator(WindowAssigner<? super IN, W> windowAssigner, TypeSerializer<W> windowSerializer, WindowBufferFactory<? super IN, ? extends EvictingWindowBuffer<IN>> windowBufferFactory, - AllWindowFunction<Iterable<IN>, OUT, W> windowFunction, + AllWindowFunction<IN, OUT, W> windowFunction, Trigger<? super IN, ? super W> trigger, Evictor<? super IN, ? super W> evictor) { super(windowAssigner, windowSerializer, windowBufferFactory, windowFunction, trigger); http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java index cfab3d5..16ca488 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java @@ -27,12 +27,12 @@ import org.apache.flink.api.common.state.MergingState; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; import org.apache.flink.streaming.api.windowing.evictors.Evictor; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import java.util.Collection; @@ -48,7 +48,7 @@ import static java.util.Objects.requireNonNull; * * @param <K> The type of key returned by the {@code KeySelector}. * @param <IN> The type of the incoming elements. - * @param <OUT> The type of elements emitted by the {@code WindowFunction}. + * @param <OUT> The type of elements emitted by the {@code InternalWindowFunction}. * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns. */ @Internal @@ -65,7 +65,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window KeySelector<IN, K> keySelector, TypeSerializer<K> keySerializer, StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> windowStateDescriptor, - WindowFunction<Iterable<IN>, OUT, K, W> windowFunction, + InternalWindowFunction<Iterable<IN>, OUT, K, W> windowFunction, Trigger<? super IN, ? super W> trigger, Evictor<? super IN, ? super W> evictor) { super(windowAssigner, windowSerializer, keySelector, keySerializer, null, windowFunction, trigger); http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java index e42d7b4..95feadc 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java @@ -77,7 +77,7 @@ import static java.util.Objects.requireNonNull; */ @Internal public class NonKeyedWindowOperator<IN, OUT, W extends Window> - extends AbstractUdfStreamOperator<OUT, AllWindowFunction<Iterable<IN>, OUT, W>> + extends AbstractUdfStreamOperator<OUT, AllWindowFunction<IN, OUT, W>> implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable { private static final long serialVersionUID = 1L; @@ -146,7 +146,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window> public NonKeyedWindowOperator(WindowAssigner<? super IN, W> windowAssigner, TypeSerializer<W> windowSerializer, WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory, - AllWindowFunction<Iterable<IN>, OUT, W> windowFunction, + AllWindowFunction<IN, OUT, W> windowFunction, Trigger<? super IN, ? super W> trigger) { super(windowFunction); http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index 289492b..9b7b347 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -34,7 +34,6 @@ import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.StateHandle; -import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; @@ -47,6 +46,7 @@ import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTaskState; @@ -74,7 +74,7 @@ import static java.util.Objects.requireNonNull; * <p> * Each pane gets its own instance of the provided {@code Trigger}. This trigger determines when * the contents of the pane should be processed to emit results. When a trigger fires, - * the given {@link WindowFunction} is invoked to produce the results that are emitted for + * the given {@link InternalWindowFunction} is invoked to produce the results that are emitted for * the pane to which the {@code Trigger} belongs. * * <p> @@ -83,12 +83,12 @@ import static java.util.Objects.requireNonNull; * * @param <K> The type of key returned by the {@code KeySelector}. * @param <IN> The type of the incoming elements. - * @param <OUT> The type of elements emitted by the {@code WindowFunction}. + * @param <OUT> The type of elements emitted by the {@code InternalWindowFunction}. * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns. */ @Internal public class WindowOperator<K, IN, ACC, OUT, W extends Window> - extends AbstractUdfStreamOperator<OUT, WindowFunction<ACC, OUT, K, W>> + extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>> implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable { private static final long serialVersionUID = 1L; @@ -126,7 +126,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> // ------------------------------------------------------------------------ /** - * This is given to the {@code WindowFunction} for emitting elements with a given timestamp. + * This is given to the {@code InternalWindowFunction} for emitting elements with a given timestamp. */ protected transient TimestampedCollector<OUT> timestampedCollector; @@ -162,7 +162,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> KeySelector<IN, K> keySelector, TypeSerializer<K> keySerializer, StateDescriptor<? extends MergingState<IN, ACC>, ?> windowStateDescriptor, - WindowFunction<ACC, OUT, K, W> windowFunction, + InternalWindowFunction<ACC, OUT, K, W> windowFunction, Trigger<? super IN, ? super W> trigger) { super(windowFunction); http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java new file mode 100644 index 0000000..32318ea --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing.functions; + +import org.apache.flink.api.common.functions.IterationRuntimeContext; +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.windowing.WindowFunction; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.util.Collector; + +/** + * Internal window function for wrapping a {@link WindowFunction} that takes an {@code Iterable} + * when the window state also is an {@code Iterable}. + */ +public final class InternalIterableWindowFunction<IN, OUT, KEY, W extends Window> extends InternalWindowFunction<Iterable<IN>, OUT, KEY, W> implements RichFunction { + private static final long serialVersionUID = 1L; + + protected WindowFunction<IN, OUT, KEY, W> wrappedFunction; + + public InternalIterableWindowFunction(WindowFunction<IN, OUT, KEY, W> wrappedFunction) { + this.wrappedFunction = wrappedFunction; + } + + @Override + public void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception { + wrappedFunction.apply(key, window, input, out); + } + + @Override + public void open(Configuration parameters) throws Exception { + FunctionUtils.openFunction(this.wrappedFunction, parameters); + } + + @Override + public void close() throws Exception { + FunctionUtils.closeFunction(this.wrappedFunction); + } + + @Override + public void setRuntimeContext(RuntimeContext t) { + FunctionUtils.setFunctionRuntimeContext(this.wrappedFunction, t); + } + + @Override + public RuntimeContext getRuntimeContext() { + throw new RuntimeException("This should never be called."); + } + + @Override + public IterationRuntimeContext getIterationRuntimeContext() { + throw new RuntimeException("This should never be called."); + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java new file mode 100644 index 0000000..fd10e5c --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing.functions; + +import org.apache.flink.api.common.functions.IterationRuntimeContext; +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.windowing.WindowFunction; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.util.Collector; + +import java.util.Collections; + +/** + * Internal window function for wrapping a {@link WindowFunction} that takes an {@code Iterable} + * when the window state is a single value. + */ +public final class InternalSingleValueWindowFunction<IN, OUT, KEY, W extends Window> extends InternalWindowFunction<IN, OUT, KEY, W> implements RichFunction { + private static final long serialVersionUID = 1L; + + protected WindowFunction<IN, OUT, KEY, W> wrappedFunction; + + public InternalSingleValueWindowFunction(WindowFunction<IN, OUT, KEY, W> wrappedFunction) { + this.wrappedFunction = wrappedFunction; + } + + @Override + public void apply(KEY key, W window, IN input, Collector<OUT> out) throws Exception { + wrappedFunction.apply(key, window, Collections.singletonList(input), out); + } + + @Override + public void open(Configuration parameters) throws Exception { + FunctionUtils.openFunction(this.wrappedFunction, parameters); + } + + @Override + public void close() throws Exception { + FunctionUtils.closeFunction(this.wrappedFunction); + } + + @Override + public void setRuntimeContext(RuntimeContext t) { + FunctionUtils.setFunctionRuntimeContext(this.wrappedFunction, t); + } + + @Override + public RuntimeContext getRuntimeContext() { + throw new RuntimeException("This should never be called."); + } + + @Override + public IterationRuntimeContext getIterationRuntimeContext() { + throw new RuntimeException("This should never be called."); + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java new file mode 100644 index 0000000..e75f3be --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing.functions; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.util.Collector; + +import java.io.Serializable; + +/** + * Internal interface for functions that are evaluated over keyed (grouped) windows. + * + * @param <IN> The type of the input value. + * @param <OUT> The type of the output value. + * @param <KEY> The type of the key. + */ +public abstract class InternalWindowFunction<IN, OUT, KEY, W extends Window> implements Function, Serializable { + private static final long serialVersionUID = 1L; + + /** + * Evaluates the window and outputs none or several elements. + * + * @param key The key for which this window is evaluated. + * @param window The window that is being evaluated. + * @param input The elements in the window being evaluated. + * @param out A collector for emitting elements. + * + * @throws Exception The function may throw exceptions to fail the program and trigger recovery. + */ + public abstract void apply(KEY key, W window, IN input, Collector<OUT> out) throws Exception; +} http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java index 502198c..7a4d6f8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java @@ -518,7 +518,7 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase { DataStream<String> window = map .windowAll(GlobalWindows.create()) .trigger(PurgingTrigger.of(CountTrigger.of(5))) - .apply(new AllWindowFunction<Iterable<Tuple2<Integer, String>>, String, GlobalWindow>() { + .apply(new AllWindowFunction<Tuple2<Integer, String>, String, GlobalWindow>() { @Override public void apply(GlobalWindow window, Iterable<Tuple2<Integer, String>> values, http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java index b8d57a6..0b0ab9e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java @@ -70,9 +70,11 @@ public class FoldApplyWindowFunctionTest { @Override public void apply(Integer integer, TimeWindow window, - Integer input, + Iterable<Integer> input, Collector<Integer> out) throws Exception { - out.collect(input); + for (Integer in: input) { + out.collect(in); + } } } ); http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java index b6e51c6..dff1184 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java @@ -67,7 +67,7 @@ import static org.junit.Assert.*; public class AccumulatingAlignedProcessingTimeWindowOperatorTest { @SuppressWarnings("unchecked") - private final WindowFunction<Iterable<String>, String, String, TimeWindow> mockFunction = mock(WindowFunction.class); + private final WindowFunction<String, String, String, TimeWindow> mockFunction = mock(WindowFunction.class); @SuppressWarnings("unchecked") private final KeySelector<String, String> mockKeySelector = mock(KeySelector.class); @@ -79,8 +79,8 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { } }; - private final WindowFunction<Iterable<Integer>, Integer, Integer, TimeWindow> validatingIdentityFunction = - new WindowFunction<Iterable<Integer>, Integer, Integer, TimeWindow>() + private final WindowFunction<Integer, Integer, Integer, TimeWindow> validatingIdentityFunction = + new WindowFunction<Integer, Integer, Integer, TimeWindow>() { @Override public void apply(Integer key, @@ -723,7 +723,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { // ------------------------------------------------------------------------ - private static class FailingFunction implements WindowFunction<Iterable<Integer>, Integer, Integer, TimeWindow> { + private static class FailingFunction implements WindowFunction<Integer, Integer, Integer, TimeWindow> { private final int failAfterElements; @@ -751,7 +751,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { // ------------------------------------------------------------------------ - private static class StatefulFunction extends RichWindowFunction<Iterable<Integer>, Integer, Integer, TimeWindow> { + private static class StatefulFunction extends RichWindowFunction<Integer, Integer, Integer, TimeWindow> { // we use a concurrent map here even though there is no concurrency, to // get "volatile" style access to entries http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java index 0583290..42f452c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java @@ -76,7 +76,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase DataStream<Tuple2<String, Integer>> window2 = source .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) - .apply(new AllWindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow>() { + .apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() { private static final long serialVersionUID = 1L; @Override @@ -123,7 +123,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase DataStream<Tuple2<String, Integer>> window2 = source .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .trigger(CountTrigger.of(100)) - .apply(new AllWindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow>() { + .apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() { private static final long serialVersionUID = 1L; @Override @@ -172,7 +172,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .trigger(CountTrigger.of(100)) .evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS))) - .apply(new AllWindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow>() { + .apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() { private static final long serialVersionUID = 1L; @Override http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java index 2f1dce5..6af7ac4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java @@ -34,6 +34,7 @@ import org.apache.flink.streaming.api.windowing.evictors.CountEvictor; import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; @@ -70,7 +71,7 @@ public class EvictingWindowOperatorTest { new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, - new ReduceIterableWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer()), + new InternalIterableWindowFunction<>(new ReduceIterableWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer())), CountTrigger.of(WINDOW_SLIDE), CountEvictor.of(WINDOW_SIZE)); @@ -141,7 +142,7 @@ public class EvictingWindowOperatorTest { new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, - new RichSumReducer<GlobalWindow>(closeCalled), + new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)), CountTrigger.of(WINDOW_SLIDE), CountEvictor.of(WINDOW_SIZE)); @@ -208,7 +209,7 @@ public class EvictingWindowOperatorTest { } } - public static class RichSumReducer<W extends Window> extends RichWindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, String, W> { + public static class RichSumReducer<W extends Window> extends RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, W> { private static final long serialVersionUID = 1L; private boolean openCalled = false; http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java index a5a8df3..f214941 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java @@ -74,7 +74,7 @@ public class TimeWindowTranslationTest extends StreamingMultipleProgramsTestBase DataStream<Tuple2<String, Integer>> window2 = source .keyBy(0) .timeWindow(Time.of(1000, TimeUnit.MILLISECONDS)) - .apply(new WindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, Tuple, TimeWindow>() { + .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() { private static final long serialVersionUID = 1L; @Override @@ -117,7 +117,7 @@ public class TimeWindowTranslationTest extends StreamingMultipleProgramsTestBase DataStream<Tuple2<String, Integer>> window2 = source .keyBy(0) .timeWindow(Time.of(1000, TimeUnit.MILLISECONDS)) - .apply(new WindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, Tuple, TimeWindow>() { + .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() { private static final long serialVersionUID = 1L; @Override @@ -165,7 +165,7 @@ public class TimeWindowTranslationTest extends StreamingMultipleProgramsTestBase DataStream<Tuple2<String, Integer>> window2 = source .timeWindowAll(Time.of(1000, TimeUnit.MILLISECONDS)) - .apply(new AllWindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow>() { + .apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() { private static final long serialVersionUID = 1L; @Override http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java index c1111a0..a1f08ad 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java @@ -41,6 +41,8 @@ import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger; import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.TestHarnessUtil; @@ -140,7 +142,7 @@ public class WindowOperatorTest { new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, - new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>(), + new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()), EventTimeTrigger.create()); operator.setInputType(inputType, new ExecutionConfig()); @@ -176,7 +178,7 @@ public class WindowOperatorTest { new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, - new RichSumReducer<TimeWindow>(), + new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>()), EventTimeTrigger.create()); operator.setInputType(inputType, new ExecutionConfig()); @@ -271,7 +273,7 @@ public class WindowOperatorTest { new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, - new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>(), + new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()), EventTimeTrigger.create()); operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); @@ -306,7 +308,7 @@ public class WindowOperatorTest { new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, - new RichSumReducer<TimeWindow>(), + new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>()), EventTimeTrigger.create()); operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); @@ -344,7 +346,7 @@ public class WindowOperatorTest { new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, - new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(), + new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>()), ContinuousEventTimeTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS))); operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); @@ -434,7 +436,7 @@ public class WindowOperatorTest { new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, - new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(), + new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>()), PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE))); operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse( @@ -497,7 +499,7 @@ public class WindowOperatorTest { } - public static class RichSumReducer<W extends Window> extends RichWindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, String, W> { + public static class RichSumReducer<W extends Window> extends RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, W> { private static final long serialVersionUID = 1L; private boolean openCalled = false; http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java index c57da8a..30bb840 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java @@ -104,7 +104,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase { DataStream<Tuple2<String, Integer>> window2 = source .keyBy(0) .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) - .apply(new WindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, Tuple, TimeWindow>() { + .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() { private static final long serialVersionUID = 1L; @Override @@ -112,7 +112,6 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase { TimeWindow window, Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) throws Exception { - } }); @@ -153,7 +152,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase { .keyBy(0) .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .trigger(CountTrigger.of(100)) - .apply(new WindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, Tuple, TimeWindow>() { + .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() { private static final long serialVersionUID = 1L; @Override @@ -204,7 +203,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase { .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .trigger(CountTrigger.of(100)) .evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS))) - .apply(new WindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, Tuple, TimeWindow>() { + .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() { private static final long serialVersionUID = 1L; @Override http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala index e36542e..3c91529 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala @@ -24,7 +24,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.streaming.api.datastream.{AllWindowedStream => JavaAllWStream} import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator} -import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction +import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction => JAllWindowFunction} +import org.apache.flink.streaming.api.scala.function.AllWindowFunction import org.apache.flink.streaming.api.windowing.evictors.Evictor import org.apache.flink.streaming.api.windowing.triggers.Trigger import org.apache.flink.streaming.api.windowing.windows.Window @@ -179,10 +180,10 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { * @return The data stream that is the result of applying the window function to the window. */ def apply[R: TypeInformation]( - function: AllWindowFunction[Iterable[T], R, W]): DataStream[R] = { + function: AllWindowFunction[T, R, W]): DataStream[R] = { val cleanedFunction = clean(function) - val javaFunction = new AllWindowFunction[java.lang.Iterable[T], R, W] { + val javaFunction = new JAllWindowFunction[T, R, W] { def apply(window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = { cleanedFunction(window, elements.asScala, out) } @@ -205,7 +206,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { function: (W, Iterable[T], Collector[R]) => Unit): DataStream[R] = { val cleanedFunction = clean(function) - val applyFunction = new AllWindowFunction[java.lang.Iterable[T], R, W] { + val applyFunction = new JAllWindowFunction[T, R, W] { def apply(window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = { cleanedFunction(window, elements.asScala, out) } @@ -228,8 +229,15 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { preAggregator: ReduceFunction[T], function: AllWindowFunction[T, R, W]): DataStream[R] = { + val cleanedFunction = clean(function) + val applyFunction = new JAllWindowFunction[T, R, W] { + def apply(window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = { + cleanedFunction(window, elements.asScala, out) + } + } + val returnType: TypeInformation[R] = implicitly[TypeInformation[R]] - asScalaStream(javaStream.apply(clean(preAggregator), clean(function), returnType)) + asScalaStream(javaStream.apply(clean(preAggregator), applyFunction, returnType)) } /** @@ -245,7 +253,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { */ def apply[R: TypeInformation]( preAggregator: (T, T) => T, - function: (W, T, Collector[R]) => Unit): DataStream[R] = { + function: (W, Iterable[T], Collector[R]) => Unit): DataStream[R] = { if (function == null) { throw new NullPointerException("Reduce function must not be null.") } @@ -259,9 +267,9 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { } val cleanApply = clean(function) - val applyFunction = new AllWindowFunction[T, R, W] { - def apply(window: W, input: T, out: Collector[R]): Unit = { - cleanApply(window, input, out) + val applyFunction = new JAllWindowFunction[T, R, W] { + def apply(window: W, input: java.lang.Iterable[T], out: Collector[R]): Unit = { + cleanApply(window, input.asScala, out) } } @@ -285,11 +293,18 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { initialValue: R, preAggregator: FoldFunction[T, R], function: AllWindowFunction[R, R, W]): DataStream[R] = { + + val cleanedFunction = clean(function) + val applyFunction = new JAllWindowFunction[R, R, W] { + def apply(window: W, elements: java.lang.Iterable[R], out: Collector[R]): Unit = { + cleanedFunction(window, elements.asScala, out) + } + } asScalaStream(javaStream.apply( initialValue, clean(preAggregator), - clean(function), + applyFunction, implicitly[TypeInformation[R]])) } @@ -308,7 +323,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { def apply[R: TypeInformation]( initialValue: R, preAggregator: (R, T) => R, - function: (W, R, Collector[R]) => Unit): DataStream[R] = { + function: (W, Iterable[R], Collector[R]) => Unit): DataStream[R] = { if (function == null) { throw new NullPointerException("Reduce function must not be null.") } @@ -322,9 +337,9 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { } val cleanApply = clean(function) - val applyFunction = new AllWindowFunction[R, R, W] { - def apply(window: W, input: R, out: Collector[R]): Unit = { - cleanApply(window, input, out) + val applyFunction = new JAllWindowFunction[R, R, W] { + def apply(window: W, input: java.lang.Iterable[R], out: Collector[R]): Unit = { + cleanApply(window, input.asScala, out) } } val returnType: TypeInformation[R] = implicitly[TypeInformation[R]] http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala index 53f033c..b7f9e00 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala @@ -24,7 +24,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.streaming.api.datastream.{WindowedStream => JavaWStream} import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator} -import org.apache.flink.streaming.api.functions.windowing.WindowFunction +import org.apache.flink.streaming.api.scala.function.WindowFunction +import org.apache.flink.streaming.api.functions.windowing.{WindowFunction => JWindowFunction} import org.apache.flink.streaming.api.windowing.evictors.Evictor import org.apache.flink.streaming.api.windowing.triggers.Trigger import org.apache.flink.streaming.api.windowing.windows.Window @@ -182,10 +183,10 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { * @return The data stream that is the result of applying the window function to the window. */ def apply[R: TypeInformation]( - function: WindowFunction[Iterable[T], R, K, W]): DataStream[R] = { + function: WindowFunction[T, R, K, W]): DataStream[R] = { val cleanFunction = clean(function) - val javaFunction = new WindowFunction[java.lang.Iterable[T], R, K, W] { + val javaFunction = new JWindowFunction[T, R, K, W] { def apply(key: K, window: W, input: java.lang.Iterable[T], out: Collector[R]) = { cleanFunction.apply(key, window, input.asScala, out) } @@ -212,7 +213,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { } val cleanedFunction = clean(function) - val applyFunction = new WindowFunction[java.lang.Iterable[T], R, K, W] { + val applyFunction = new JWindowFunction[T, R, K, W] { def apply(key: K, window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = { cleanedFunction(key, window, elements.asScala, out) } @@ -235,8 +236,16 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { preAggregator: ReduceFunction[T], function: WindowFunction[T, R, K, W]): DataStream[R] = { + val cleanedFunction = clean(function) + + val applyFunction = new JWindowFunction[T, R, K, W] { + def apply(key: K, window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = { + cleanedFunction.apply(key, window, elements.asScala, out) + } + } + val resultType: TypeInformation[R] = implicitly[TypeInformation[R]] - asScalaStream(javaStream.apply(clean(preAggregator), clean(function), resultType)) + asScalaStream(javaStream.apply(clean(preAggregator), applyFunction, resultType)) } /** @@ -252,7 +261,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { */ def apply[R: TypeInformation]( preAggregator: (T, T) => T, - function: (K, W, T, Collector[R]) => Unit): DataStream[R] = { + function: (K, W, Iterable[T], Collector[R]) => Unit): DataStream[R] = { if (function == null) { throw new NullPointerException("Reduce function must not be null.") @@ -267,9 +276,9 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { } val cleanApply = clean(function) - val applyFunction = new WindowFunction[T, R, K, W] { - def apply(key: K, window: W, input: T, out: Collector[R]): Unit = { - cleanApply(key, window, input, out) + val applyFunction = new JWindowFunction[T, R, K, W] { + def apply(key: K, window: W, input: java.lang.Iterable[T], out: Collector[R]): Unit = { + cleanApply(key, window, input.asScala, out) } } @@ -292,11 +301,19 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { initialValue: R, foldFunction: FoldFunction[T, R], function: WindowFunction[R, R, K, W]): DataStream[R] = { - + + val cleanedFunction = clean(function) + + val applyFunction = new JWindowFunction[R, R, K, W] { + def apply(key: K, window: W, elements: java.lang.Iterable[R], out: Collector[R]): Unit = { + cleanedFunction.apply(key, window, elements.asScala, out) + } + } + asScalaStream(javaStream.apply( initialValue, clean(foldFunction), - clean(function), + applyFunction, implicitly[TypeInformation[R]])) } @@ -314,7 +331,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { def apply[R: TypeInformation]( initialValue: R, foldFunction: (R, T) => R, - function: (K, W, R, Collector[R]) => Unit): DataStream[R] = { + function: (K, W, Iterable[R], Collector[R]) => Unit): DataStream[R] = { if (function == null) { throw new NullPointerException("Fold function must not be null.") @@ -329,9 +346,9 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { } val cleanApply = clean(function) - val applyFunction = new WindowFunction[R, R, K, W] { - def apply(key: K, window: W, input: R, out: Collector[R]): Unit = { - cleanApply(key, window, input, out) + val applyFunction = new JWindowFunction[R, R, K, W] { + def apply(key: K, window: W, input: java.lang.Iterable[R], out: Collector[R]): Unit = { + cleanApply(key, window, input.asScala, out) } } val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
