[FLINK-5720] Deprecate DataStream#fold()
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e5adf113 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e5adf113 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e5adf113 Branch: refs/heads/release-1.3 Commit: e5adf11342337994ea9da3f50ce7b587086bf820 Parents: 35b74f2 Author: zentol <[email protected]> Authored: Wed May 3 15:49:03 2017 +0200 Committer: zentol <[email protected]> Committed: Tue May 9 21:07:33 2017 +0200 ---------------------------------------------------------------------- .../streaming/state/RocksDBFoldingState.java | 3 +++ .../flink/api/common/functions/FoldFunction.java | 3 +++ .../api/common/functions/RichFoldFunction.java | 3 +++ .../api/common/functions/RuntimeContext.java | 3 +++ .../flink/api/common/state/FoldingState.java | 3 +++ .../api/common/state/FoldingStateDescriptor.java | 3 +++ .../flink/api/common/state/KeyedStateStore.java | 5 ++++- .../flink/api/common/state/StateBinder.java | 3 +++ .../flink/api/java/typeutils/TypeExtractor.java | 8 ++++++++ .../runtime/state/AbstractKeyedStateBackend.java | 5 ++++- .../runtime/state/heap/HeapFoldingState.java | 7 +++++-- .../state/internal/InternalFoldingState.java | 3 +++ .../api/datastream/AllWindowedStream.java | 18 ++++++++++++++++++ .../streaming/api/datastream/KeyedStream.java | 6 ++++++ .../streaming/api/datastream/WindowedStream.java | 18 ++++++++++++++++++ .../windowing/FoldApplyAllWindowFunction.java | 3 +++ .../FoldApplyProcessAllWindowFunction.java | 3 +++ .../windowing/FoldApplyProcessWindowFunction.java | 3 +++ .../windowing/FoldApplyWindowFunction.java | 3 +++ .../api/operators/StreamGroupedFold.java | 3 +++ .../streaming/api/scala/AllWindowedStream.scala | 10 ++++++++-- .../flink/streaming/api/scala/KeyedStream.scala | 3 +++ .../streaming/api/scala/WindowedStream.scala | 8 +++++++- 23 files changed, 120 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e5adf113/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java index 26dc3dd..d5d9fce 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java @@ -39,7 +39,10 @@ import java.io.IOException; * @param <N> The type of the namespace. * @param <T> The type of the values that can be folded into the state. * @param <ACC> The type of the value in the folding state. + * + * @deprecated will be removed in a future version */ +@Deprecated public class RocksDBFoldingState<K, N, T, ACC> extends AbstractRocksDBState<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, ACC> implements InternalFoldingState<N, T, ACC> { http://git-wip-us.apache.org/repos/asf/flink/blob/e5adf113/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java index b52828e..b3fd700 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java @@ -38,8 +38,11 @@ import java.io.Serializable; * * @param <T> Type of the initial input and the returned element * @param <O> Type of the elements that the group/list/stream contains + * + * @deprecated use {@link AggregateFunction} instead */ @Public +@Deprecated public interface FoldFunction<O, T> extends Function, Serializable { /** * The core method of FoldFunction, combining two values into one value of the same type. http://git-wip-us.apache.org/repos/asf/flink/blob/e5adf113/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFoldFunction.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFoldFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFoldFunction.java index 245550d..516e1b4 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFoldFunction.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFoldFunction.java @@ -28,8 +28,11 @@ import org.apache.flink.annotation.Public; * * @param <T> Type of the initial input and the returned element * @param <O> Type of the elements that the group/list/stream contains + * + *@deprecated use {@link RichAggregateFunction} instead */ @Public +@Deprecated public abstract class RichFoldFunction<O, T> extends AbstractRichFunction implements FoldFunction<O, T> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/e5adf113/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java index 2978f3a..38155f6 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java @@ -394,8 +394,11 @@ public interface RuntimeContext { * * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the * function (function is not part of a KeyedStream). + * + * @deprecated will be removed in a future version */ @PublicEvolving + @Deprecated <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties); /** http://git-wip-us.apache.org/repos/asf/flink/blob/e5adf113/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java index 684a612..7e45399 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java @@ -35,6 +35,9 @@ import org.apache.flink.annotation.PublicEvolving; * * @param <T> Type of the values folded into the state * @param <ACC> Type of the value in the state + * + * @deprecated will be removed in a future version */ @PublicEvolving +@Deprecated public interface FoldingState<T, ACC> extends AppendingState<T, ACC> {} http://git-wip-us.apache.org/repos/asf/flink/blob/e5adf113/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java index 73bfaa8..f7609c3 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java @@ -32,8 +32,11 @@ import static java.util.Objects.requireNonNull; * * @param <T> Type of the values folded int othe state * @param <ACC> Type of the value in the state + * + * @deprecated will be removed in a future version in favor of {@link AggregatingStateDescriptor} */ @PublicEvolving +@Deprecated public class FoldingStateDescriptor<T, ACC> extends StateDescriptor<FoldingState<T, ACC>, ACC> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/e5adf113/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java b/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java index 2187f6c..ea9ac41 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java @@ -193,8 +193,11 @@ public interface KeyedStateStore { * * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the * function (function is not part of a KeyedStream). + * + * @deprecated will be removed in a future version */ @PublicEvolving + @Deprecated <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties); /** @@ -236,4 +239,4 @@ public interface KeyedStateStore { */ @PublicEvolving <UK, UV> MapState<UK,UV> getMapState(MapStateDescriptor<UK, UV> stateProperties); -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/e5adf113/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java index 9df7a47..a373923 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java @@ -68,7 +68,10 @@ public interface StateBinder { * * @param <T> Type of the values folded into the state * @param <ACC> Type of the value in the state + * + * @deprecated will be removed in a future version */ + @Deprecated <T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception; /** http://git-wip-us.apache.org/repos/asf/flink/blob/e5adf113/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index a5f236f..f1bf957 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -177,13 +177,21 @@ public class TypeExtractor { return getUnaryOperatorReturnType((Function) flatMapInterface, FlatMapFunction.class, false, true, inType, functionName, allowMissing); } + /** + * @deprecated will be removed in a future version + */ @PublicEvolving + @Deprecated public static <IN, OUT> TypeInformation<OUT> getFoldReturnTypes(FoldFunction<IN, OUT> foldInterface, TypeInformation<IN> inType) { return getFoldReturnTypes(foldInterface, inType, null, false); } + /** + * @deprecated will be removed in a future version + */ @PublicEvolving + @Deprecated public static <IN, OUT> TypeInformation<OUT> getFoldReturnTypes(FoldFunction<IN, OUT> foldInterface, TypeInformation<IN> inType, String functionName, boolean allowMissing) { return getUnaryOperatorReturnType((Function) foldInterface, FoldFunction.class, false, false, inType, functionName, allowMissing); http://git-wip-us.apache.org/repos/asf/flink/blob/e5adf113/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java index 47ebe3b..2b225df 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java @@ -195,8 +195,11 @@ public abstract class AbstractKeyedStateBackend<K> * * @param <N> The type of the namespace. * @param <T> Type of the values folded into the state - * @param <ACC> Type of the value in the state * + * @param <ACC> Type of the value in the state + * + * @deprecated will be removed in a future version */ + @Deprecated protected abstract <N, T, ACC> InternalFoldingState<N, T, ACC> createFoldingState( TypeSerializer<N> namespaceSerializer, FoldingStateDescriptor<T, ACC> stateDesc) throws Exception; http://git-wip-us.apache.org/repos/asf/flink/blob/e5adf113/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java index dad6d0d..3a77cca 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java @@ -36,7 +36,10 @@ import java.io.IOException; * @param <N> The type of the namespace. * @param <T> The type of the values that can be folded into the state. * @param <ACC> The type of the value in the folding state. + * + * @deprecated will be removed in a future version */ +@Deprecated public class HeapFoldingState<K, N, T, ACC> extends AbstractHeapState<K, N, ACC, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>> implements InternalFoldingState<N, T, ACC> { @@ -84,7 +87,7 @@ public class HeapFoldingState<K, N, T, ACC> } } - static final class FoldTransformation<T, ACC> implements StateTransformationFunction<ACC, T> { + private static final class FoldTransformation<T, ACC> implements StateTransformationFunction<ACC, T> { private final FoldingStateDescriptor<T, ACC> stateDescriptor; private final FoldFunction<T, ACC> foldFunction; @@ -99,4 +102,4 @@ public class HeapFoldingState<K, N, T, ACC> return foldFunction.fold((previousState != null) ? previousState : stateDescriptor.getDefaultValue(), value); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/e5adf113/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java index eb58ce5..4ef258f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java @@ -28,5 +28,8 @@ import org.apache.flink.api.common.state.FoldingState; * @param <N> The type of the namespace * @param <T> Type of the values folded into the state * @param <ACC> Type of the value in the state + * + * @deprecated will be removed in a future version */ +@Deprecated public interface InternalFoldingState<N, T, ACC> extends InternalAppendingState<N, T, ACC>, FoldingState<T, ACC> {} http://git-wip-us.apache.org/repos/asf/flink/blob/e5adf113/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java index 0d953a9..7ea65fc 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java @@ -754,7 +754,10 @@ public class AllWindowedStream<T, W extends Window> { * * @param function The fold function. * @return The data stream that is the result of applying the fold function to the window. + * + * @deprecated use {@link #aggregate(AggregateFunction)} instead */ + @Deprecated public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> function) { if (function instanceof RichFunction) { throw new UnsupportedOperationException("FoldFunction of fold can not be a RichFunction. " + @@ -774,7 +777,10 @@ public class AllWindowedStream<T, W extends Window> { * * @param function The fold function. * @return The data stream that is the result of applying the fold function to the window. + * + * @deprecated use {@link #aggregate(AggregateFunction, TypeInformation, TypeInformation)} instead */ + @Deprecated public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> function, TypeInformation<R> resultType) { if (function instanceof RichFunction) { throw new UnsupportedOperationException("FoldFunction of fold can not be a RichFunction. " + @@ -795,8 +801,11 @@ public class AllWindowedStream<T, W extends Window> { * @param foldFunction The fold function that is used for incremental aggregation. * @param function The window function. * @return The data stream that is the result of applying the window function to the window. + * + * @deprecated use {@link #aggregate(AggregateFunction, ProcessAllWindowFunction)} instead */ @PublicEvolving + @Deprecated public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue, FoldFunction<T, ACC> foldFunction, AllWindowFunction<ACC, R, W> function) { TypeInformation<ACC> foldAccumulatorType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(), @@ -821,8 +830,11 @@ public class AllWindowedStream<T, W extends Window> { * @param foldAccumulatorType Type information for the result type of the fold function * @param resultType Type information for the result type of the window function * @return The data stream that is the result of applying the window function to the window. + * + * @deprecated use {@link #aggregate(AggregateFunction, AllWindowFunction, TypeInformation, TypeInformation, TypeInformation)} instead */ @PublicEvolving + @Deprecated public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue, FoldFunction<T, ACC> foldFunction, AllWindowFunction<ACC, R, W> function, @@ -901,8 +913,11 @@ public class AllWindowedStream<T, W extends Window> { * @param foldFunction The fold function that is used for incremental aggregation. * @param function The window function. * @return The data stream that is the result of applying the window function to the window. + * + * @deprecated use {@link #aggregate(AggregateFunction, ProcessAllWindowFunction)} instead */ @PublicEvolving + @Deprecated public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue, FoldFunction<T, ACC> foldFunction, ProcessAllWindowFunction<ACC, R, W> function) { TypeInformation<ACC> foldAccumulatorType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(), @@ -927,8 +942,11 @@ public class AllWindowedStream<T, W extends Window> { * @param foldAccumulatorType Type information for the result type of the fold function * @param resultType Type information for the result type of the window function * @return The data stream that is the result of applying the window function to the window. + * + * @deprecated use {@link #aggregate(AggregateFunction, ProcessAllWindowFunction, TypeInformation, TypeInformation, TypeInformation)} instead */ @PublicEvolving + @Deprecated public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue, FoldFunction<T, ACC> foldFunction, ProcessAllWindowFunction<ACC, R, W> function, http://git-wip-us.apache.org/repos/asf/flink/blob/e5adf113/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java index 9334c66..e3171c3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java @@ -416,7 +416,10 @@ public class KeyedStream<T, KEY> extends DataStream<T> { * @param initialValue * The initialValue passed to the folders for each key. * @return The transformed DataStream. + * + * @deprecated will be removed in a future version */ + @Deprecated public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> folder) { TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes( @@ -748,8 +751,11 @@ public class KeyedStream<T, KEY> extends DataStream<T> { * @param queryableStateName Name under which to the publish the queryable state instance * @param stateDescriptor State descriptor to create state instance from * @return Queryable state instance + * + * @deprecated will be removed in a future version */ @PublicEvolving + @Deprecated public <ACC> QueryableStateStream<KEY, ACC> asQueryableState( String queryableStateName, FoldingStateDescriptor<T, ACC> stateDescriptor) { http://git-wip-us.apache.org/repos/asf/flink/blob/e5adf113/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java index 2d7dafe..7913e95 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java @@ -487,7 +487,10 @@ public class WindowedStream<T, K, W extends Window> { * * @param function The fold function. * @return The data stream that is the result of applying the fold function to the window. + * + * @deprecated use {@link #aggregate(AggregationFunction)} instead */ + @Deprecated public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> function) { if (function instanceof RichFunction) { throw new UnsupportedOperationException("FoldFunction can not be a RichFunction. " + @@ -507,7 +510,10 @@ public class WindowedStream<T, K, W extends Window> { * * @param function The fold function. * @return The data stream that is the result of applying the fold function to the window. + * + * @deprecated use {@link #aggregate(AggregateFunction, TypeInformation, TypeInformation)} instead */ + @Deprecated public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> function, TypeInformation<R> resultType) { if (function instanceof RichFunction) { throw new UnsupportedOperationException("FoldFunction can not be a RichFunction. " + @@ -528,8 +534,11 @@ public class WindowedStream<T, K, W extends Window> { * @param foldFunction The fold function that is used for incremental aggregation. * @param function The window function. * @return The data stream that is the result of applying the window function to the window. + * + * @deprecated use {@link #aggregate(AggregateFunction, WindowFunction)} instead */ @PublicEvolving + @Deprecated public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue, FoldFunction<T, ACC> foldFunction, WindowFunction<ACC, R, K, W> function) { TypeInformation<ACC> foldAccumulatorType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(), @@ -554,8 +563,11 @@ public class WindowedStream<T, K, W extends Window> { * @param foldAccumulatorType Type information for the result type of the fold function * @param resultType Type information for the result type of the window function * @return The data stream that is the result of applying the window function to the window. + * + * @deprecated use {@link #aggregate(AggregateFunction, ProcessWindowFunction, TypeInformation, TypeInformation, TypeInformation)} instead */ @PublicEvolving + @Deprecated public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue, FoldFunction<T, ACC> foldFunction, WindowFunction<ACC, R, K, W> function, @@ -638,8 +650,11 @@ public class WindowedStream<T, K, W extends Window> { * @param foldFunction The fold function that is used for incremental aggregation. * @param windowFunction The window function. * @return The data stream that is the result of applying the window function to the window. + * + * @deprecated use {@link #aggregate(AggregateFunction, WindowFunction)} instead */ @PublicEvolving + @Deprecated public <R, ACC> SingleOutputStreamOperator<R> fold(ACC initialValue, FoldFunction<T, ACC> foldFunction, ProcessWindowFunction<ACC, R, K, W> windowFunction) { if (foldFunction instanceof RichFunction) { throw new UnsupportedOperationException("FoldFunction can not be a RichFunction."); @@ -667,7 +682,10 @@ public class WindowedStream<T, K, W extends Window> { * @param windowFunction The process window function. * @param windowResultType The process window function result type. * @return The data stream that is the result of applying the fold function to the window. + * + * @deprecated use {@link #aggregate(AggregateFunction, WindowFunction, TypeInformation, TypeInformation, TypeInformation)} instead */ + @Deprecated @Internal public <R, ACC> SingleOutputStreamOperator<R> fold( ACC initialValue, http://git-wip-us.apache.org/repos/asf/flink/blob/e5adf113/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java index 30662f0..2069f7a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java @@ -37,8 +37,11 @@ import org.apache.flink.util.Collector; /** * Internal {@link AllWindowFunction} that is used for implementing a fold on a window configuration * that only allows {@link AllWindowFunction} and cannot directly execute a {@link FoldFunction}. + * + * @deprecated will be removed in a future version */ @Internal +@Deprecated public class FoldApplyAllWindowFunction<W extends Window, T, ACC, R> extends WrappingFunction<AllWindowFunction<ACC, R, W>> implements AllWindowFunction<T, R, W>, OutputTypeConfigurable<R> { http://git-wip-us.apache.org/repos/asf/flink/blob/e5adf113/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java index 38244dd..8982c71 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java @@ -39,8 +39,11 @@ import org.apache.flink.util.Collector; * Internal {@link ProcessAllWindowFunction} that is used for implementing a fold on a window * configuration that only allows {@link ProcessAllWindowFunction} and cannot directly execute a * {@link FoldFunction}. + * + * @deprecated will be removed in a future version */ @Internal +@Deprecated public class FoldApplyProcessAllWindowFunction<W extends Window, T, ACC, R> extends RichProcessAllWindowFunction<T, R, W> implements OutputTypeConfigurable<R> { http://git-wip-us.apache.org/repos/asf/flink/blob/e5adf113/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java index 1b2c2e2..0e0356a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java @@ -39,8 +39,11 @@ import org.apache.flink.util.Collector; * Internal {@link ProcessWindowFunction} that is used for implementing a fold on a window * configuration that only allows {@link ProcessWindowFunction} and cannot directly execute a * {@link FoldFunction}. + * + * @deprecated will be removed in a future version */ @Internal +@Deprecated public class FoldApplyProcessWindowFunction<K, W extends Window, T, ACC, R> extends RichProcessWindowFunction<T, R, K, W> implements OutputTypeConfigurable<R> { http://git-wip-us.apache.org/repos/asf/flink/blob/e5adf113/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java index 770deb0..865dbc9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java @@ -37,8 +37,11 @@ import org.apache.flink.util.Collector; /** * Internal {@link WindowFunction} that is used for implementing a fold on a window configuration * that only allows {@link WindowFunction} and cannot directly execute a {@link FoldFunction}. + * + * @deprecated will be removed in a future version */ @Internal +@Deprecated public class FoldApplyWindowFunction<K, W extends Window, T, ACC, R> extends WrappingFunction<WindowFunction<ACC, R, K, W>> implements WindowFunction<T, R, K, W>, OutputTypeConfigurable<R> { http://git-wip-us.apache.org/repos/asf/flink/blob/e5adf113/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java index 1ed7178..07c5c90 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java @@ -35,8 +35,11 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; /** * A {@link StreamOperator} for executing a {@link FoldFunction} on a * {@link org.apache.flink.streaming.api.datastream.KeyedStream}. + * + * @deprecated will be removed in a future version */ @Internal +@Deprecated public class StreamGroupedFold<IN, OUT, KEY> extends AbstractUdfStreamOperator<OUT, FoldFunction<IN, OUT>> implements OneInputStreamOperator<IN, OUT>, OutputTypeConfigurable<OUT> { http://git-wip-us.apache.org/repos/asf/flink/blob/e5adf113/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala index 757e45f..bbdcf4a 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala @@ -401,7 +401,8 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { * * @param function The fold function. * @return The data stream that is the result of applying the fold function to the window. - */ + */ + @deprecated("use [[aggregate()]] instead") def fold[R: TypeInformation]( initialValue: R, function: FoldFunction[T,R]): DataStream[R] = { @@ -421,7 +422,8 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { * * @param function The fold function. * @return The data stream that is the result of applying the fold function to the window. - */ + */ + @deprecated("use [[aggregate()]] instead") def fold[R: TypeInformation](initialValue: R)(function: (R, T) => R): DataStream[R] = { if (function == null) { throw new NullPointerException("Fold function must not be null.") @@ -444,6 +446,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { * @param windowFunction The window function. * @return The data stream that is the result of applying the window function to the window. */ + @deprecated("use [[aggregate()]] instead") def fold[ACC: TypeInformation, R: TypeInformation]( initialValue: ACC, preAggregator: FoldFunction[T, ACC], @@ -474,6 +477,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { * @param windowFunction The process window function. * @return The data stream that is the result of applying the window function to the window. */ + @deprecated("use [[aggregate()]] instead") @PublicEvolving def fold[ACC: TypeInformation, R: TypeInformation]( initialValue: ACC, @@ -505,6 +509,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { * @param windowFunction The window function. * @return The data stream that is the result of applying the window function to the window. */ + @deprecated("use [[aggregate()]] instead") def fold[ACC: TypeInformation, R: TypeInformation]( initialValue: ACC, preAggregator: (ACC, T) => ACC, @@ -540,6 +545,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { * @param windowFunction The window function. * @return The data stream that is the result of applying the window function to the window. */ + @deprecated("use [[aggregate()]] instead") @PublicEvolving def fold[ACC: TypeInformation, R: TypeInformation]( initialValue: ACC, http://git-wip-us.apache.org/repos/asf/flink/blob/e5adf113/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala index d5ef89f..aaeb1ec 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala @@ -184,6 +184,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T] * using an associative fold function and an initial value. An independent * aggregate is kept per key. */ + @deprecated("will be removed in a future version") def fold[R: TypeInformation](initialValue: R, folder: FoldFunction[T,R]): DataStream[R] = { if (folder == null) { @@ -201,6 +202,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T] * using an associative fold function and an initial value. An independent * aggregate is kept per key. */ + @deprecated("will be removed in a future version") def fold[R: TypeInformation](initialValue: R)(fun: (R,T) => R): DataStream[R] = { if (fun == null) { throw new NullPointerException("Fold function must not be null.") @@ -507,6 +509,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T] * @return Queryable state instance */ @PublicEvolving + @deprecated("will be removed in a future version") def asQueryableState[ACC]( queryableStateName: String, stateDescriptor: FoldingStateDescriptor[T, ACC]) : QueryableStateStream[K, ACC] = { http://git-wip-us.apache.org/repos/asf/flink/blob/e5adf113/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala index 4e0e1a4..0f8a6e0 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala @@ -382,6 +382,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { * @param function The fold function. * @return The data stream that is the result of applying the fold function to the window. */ + @deprecated("use [[aggregate()]] instead") def fold[R: TypeInformation]( initialValue: R, function: FoldFunction[T,R]): DataStream[R] = { @@ -401,7 +402,8 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { * * @param function The fold function. * @return The data stream that is the result of applying the fold function to the window. - */ + */ + @deprecated("use [[aggregate()]] instead") def fold[R: TypeInformation](initialValue: R)(function: (R, T) => R): DataStream[R] = { if (function == null) { throw new NullPointerException("Fold function must not be null.") @@ -423,6 +425,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { * @param function The window function. * @return The data stream that is the result of applying the window function to the window. */ + @deprecated("use [[aggregate()]] instead") def fold[ACC: TypeInformation, R: TypeInformation]( initialValue: ACC, foldFunction: FoldFunction[T, ACC], @@ -452,6 +455,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { * @param windowFunction The window function. * @return The data stream that is the result of applying the window function to the window. */ + @deprecated("use [[aggregate()]] instead") def fold[ACC: TypeInformation, R: TypeInformation]( initialValue: ACC, foldFunction: (ACC, T) => ACC, @@ -486,6 +490,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { * @param function The process window function. * @return The data stream that is the result of applying the window function to the window. */ + @deprecated("use [[aggregate()]] instead") @PublicEvolving def fold[R: TypeInformation, ACC: TypeInformation]( initialValue: ACC, @@ -516,6 +521,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { * @param function The process window function. * @return The data stream that is the result of applying the window function to the window. */ + @deprecated("use [[aggregate()]] instead") @PublicEvolving def fold[R: TypeInformation, ACC: TypeInformation]( initialValue: ACC,
