Repository: flink Updated Branches: refs/heads/master 6f3723e83 -> 5983069fc
[FLINK-5743] Mark WindowedStream.aggregate() methods as PublicEvolving Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5983069f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5983069f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5983069f Branch: refs/heads/master Commit: 5983069fc2a40492b514ffa53a508cd8992c6bf2 Parents: 6f3723e Author: Aljoscha Krettek <[email protected]> Authored: Fri Feb 24 11:27:08 2017 +0100 Committer: Aljoscha Krettek <[email protected]> Committed: Fri Feb 24 14:00:25 2017 +0100 ---------------------------------------------------------------------- .../api/datastream/AllWindowedStream.java | 4 +++ .../api/datastream/WindowedStream.java | 4 +++ .../streaming/api/scala/AllWindowedStream.scala | 19 ++++++++------ .../streaming/api/scala/WindowedStream.scala | 26 +++++++++++--------- 4 files changed, 34 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5983069f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java index f883ef5..742a2ed 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java @@ -303,6 +303,7 @@ public class AllWindowedStream<T, W extends Window> { * @param <R> The type of the elements in the resulting stream, equal to the * AggregateFunction's result type */ + @PublicEvolving public <ACC, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, R> function) { checkNotNull(function, "function"); @@ -331,6 +332,7 @@ public class AllWindowedStream<T, W extends Window> { * @param <R> The type of the elements in the resulting stream, equal to the * AggregateFunction's result type */ + @PublicEvolving public <ACC, R> SingleOutputStreamOperator<R> aggregate( AggregateFunction<T, ACC, R> function, TypeInformation<ACC> accumulatorType, @@ -366,6 +368,7 @@ public class AllWindowedStream<T, W extends Window> { * @param <R> The type of the elements in the resulting stream, equal to the * WindowFunction's result type */ + @PublicEvolving public <ACC, V, R> SingleOutputStreamOperator<R> aggregate( AggregateFunction<T, ACC, V> aggFunction, AllWindowFunction<V, R, W> windowFunction) { @@ -405,6 +408,7 @@ public class AllWindowedStream<T, W extends Window> { * @param <R> The type of the elements in the resulting stream, equal to the * WindowFunction's result type */ + @PublicEvolving public <ACC, V, R> SingleOutputStreamOperator<R> aggregate( AggregateFunction<T, ACC, V> aggregateFunction, AllWindowFunction<V, R, W> windowFunction, http://git-wip-us.apache.org/repos/asf/flink/blob/5983069f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java index b28434c..164e47e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java @@ -737,6 +737,7 @@ public class WindowedStream<T, K, W extends Window> { * @param <R> The type of the elements in the resulting stream, equal to the * AggregateFunction's result type */ + @PublicEvolving public <ACC, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, R> function) { checkNotNull(function, "function"); @@ -765,6 +766,7 @@ public class WindowedStream<T, K, W extends Window> { * @param <R> The type of the elements in the resulting stream, equal to the * AggregateFunction's result type */ + @PublicEvolving public <ACC, R> SingleOutputStreamOperator<R> aggregate( AggregateFunction<T, ACC, R> function, TypeInformation<ACC> accumulatorType, @@ -800,6 +802,7 @@ public class WindowedStream<T, K, W extends Window> { * @param <R> The type of the elements in the resulting stream, equal to the * WindowFunction's result type */ + @PublicEvolving public <ACC, V, R> SingleOutputStreamOperator<R> aggregate( AggregateFunction<T, ACC, V> aggFunction, WindowFunction<V, R, K, W> windowFunction) { @@ -839,6 +842,7 @@ public class WindowedStream<T, K, W extends Window> { * @param <R> The type of the elements in the resulting stream, equal to the * WindowFunction's result type */ + @PublicEvolving public <ACC, V, R> SingleOutputStreamOperator<R> aggregate( AggregateFunction<T, ACC, V> aggregateFunction, WindowFunction<V, R, K, W> windowFunction, http://git-wip-us.apache.org/repos/asf/flink/blob/5983069f/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala index 7f52252..cf062fc 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala @@ -209,8 +209,9 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { * @param aggregateFunction The aggregation function. * @return The data stream that is the result of applying the fold function to the window. */ - def aggregate[ACC: TypeInformation, R: TypeInformation] - (aggregateFunction: AggregateFunction[T, ACC, R]): DataStream[R] = { + @PublicEvolving + def aggregate[ACC: TypeInformation, R: TypeInformation]( + aggregateFunction: AggregateFunction[T, ACC, R]): DataStream[R] = { checkNotNull(aggregateFunction, "AggregationFunction must not be null") @@ -232,9 +233,10 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { * @param windowFunction The window function. * @return The data stream that is the result of applying the window function to the window. */ - def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation] - (preAggregator: AggregateFunction[T, ACC, V], - windowFunction: AllWindowFunction[V, R, W]): DataStream[R] = { + @PublicEvolving + def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation]( + preAggregator: AggregateFunction[T, ACC, V], + windowFunction: AllWindowFunction[V, R, W]): DataStream[R] = { checkNotNull(preAggregator, "AggregationFunction must not be null") checkNotNull(windowFunction, "Window function must not be null") @@ -264,9 +266,10 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { * @param windowFunction The window function. * @return The data stream that is the result of applying the window function to the window. */ - def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation] - (preAggregator: AggregateFunction[T, ACC, V], - windowFunction: (W, Iterable[V], Collector[R]) => Unit): DataStream[R] = { + @PublicEvolving + def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation]( + preAggregator: AggregateFunction[T, ACC, V], + windowFunction: (W, Iterable[V], Collector[R]) => Unit): DataStream[R] = { checkNotNull(preAggregator, "AggregationFunction must not be null") checkNotNull(windowFunction, "Window function must not be null") http://git-wip-us.apache.org/repos/asf/flink/blob/5983069f/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala index a5fbeb9..32a9f60 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala @@ -258,8 +258,9 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { * @param aggregateFunction The aggregation function. * @return The data stream that is the result of applying the fold function to the window. */ - def aggregate[ACC: TypeInformation, R: TypeInformation] - (aggregateFunction: AggregateFunction[T, ACC, R]): DataStream[R] = { + @PublicEvolving + def aggregate[ACC: TypeInformation, R: TypeInformation]( + aggregateFunction: AggregateFunction[T, ACC, R]): DataStream[R] = { val accumulatorType: TypeInformation[ACC] = implicitly[TypeInformation[ACC]] val resultType: TypeInformation[R] = implicitly[TypeInformation[R]] @@ -279,9 +280,10 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { * @param windowFunction The window function. * @return The data stream that is the result of applying the window function to the window. */ - def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation] - (preAggregator: AggregateFunction[T, ACC, V], - windowFunction: WindowFunction[V, R, K, W]): DataStream[R] = { + @PublicEvolving + def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation]( + preAggregator: AggregateFunction[T, ACC, V], + windowFunction: WindowFunction[V, R, K, W]): DataStream[R] = { val cleanedPreAggregator = clean(preAggregator) val cleanedWindowFunction = clean(windowFunction) @@ -308,9 +310,10 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { * @param windowFunction The window function. * @return The data stream that is the result of applying the window function to the window. */ - def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation] - (preAggregator: AggregateFunction[T, ACC, V], - windowFunction: (K, W, Iterable[V], Collector[R]) => Unit): DataStream[R] = { + @PublicEvolving + def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation]( + preAggregator: AggregateFunction[T, ACC, V], + windowFunction: (K, W, Iterable[V], Collector[R]) => Unit): DataStream[R] = { val cleanedPreAggregator = clean(preAggregator) val cleanedWindowFunction = clean(windowFunction) @@ -337,9 +340,10 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { * @param windowFunction The window function. * @return The data stream that is the result of applying the window function to the window. */ - def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation] - (preAggregator: AggregateFunction[T, ACC, V], - windowFunction: ProcessWindowFunction[V, R, K, W]): DataStream[R] = { + @PublicEvolving + def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation]( + preAggregator: AggregateFunction[T, ACC, V], + windowFunction: ProcessWindowFunction[V, R, K, W]): DataStream[R] = { val cleanedPreAggregator = clean(preAggregator) val cleanedWindowFunction = clean(windowFunction)
