This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 276332e11277e7e50789e4eaed0f11ae59a0f8d1
Author: Ghildiyal <manish.ghildi...@here.com>
AuthorDate: Sat May 16 16:48:38 2020 +0530

    [FLINK-17376] Remove deprecated state access methods
---
 docs/dev/stream/state/state.md                     | 10 +----
 docs/dev/stream/state/state.zh.md                  |  9 +----
 .../kafka/FlinkKafkaConsumerBaseTest.java          | 13 ------
 .../connectors/rabbitmq/RMQSourceTest.java         |  3 +-
 .../flink/api/common/functions/RuntimeContext.java | 46 ----------------------
 .../functions/util/AbstractRuntimeUDFContext.java  | 10 -----
 .../flink/api/common/state/OperatorStateStore.java | 38 ------------------
 .../flink/cep/operator/CepRuntimeContext.java      |  7 ----
 .../flink/cep/operator/CepRuntimeContextTest.java  | 13 ------
 .../state/api/runtime/SavepointRuntimeContext.java | 13 ------
 .../runtime/state/DefaultOperatorStateBackend.java | 25 ------------
 .../runtime/state/OperatorStateBackendTest.java    |  4 +-
 .../api/functions/async/RichAsyncFunction.java     |  7 ----
 .../api/operators/StreamingRuntimeContext.java     |  9 -----
 .../api/functions/async/RichAsyncFunctionTest.java | 13 ------
 .../api/operators/StreamingRuntimeContextTest.java | 28 -------------
 .../collect/utils/MockOperatorStateStore.java      | 11 ------
 .../StatefulJobSavepointMigrationITCase.scala      |  2 +-
 ...StatefulJobWBroadcastStateMigrationITCase.scala |  6 +--
 19 files changed, 10 insertions(+), 257 deletions(-)

diff --git a/docs/dev/stream/state/state.md b/docs/dev/stream/state/state.md
index 728beab..389a03e 100644
--- a/docs/dev/stream/state/state.md
+++ b/docs/dev/stream/state/state.md
@@ -115,11 +115,6 @@ added to the state. Contrary to `ReducingState`, the 
aggregate type may be diffe
 of elements that are added to the state. The interface is the same as for 
`ListState` but elements
 added using `add(IN)` are aggregated using a specified `AggregateFunction`.
 
-* `FoldingState<T, ACC>`: This keeps a single value that represents the 
aggregation of all values
-added to the state. Contrary to `ReducingState`, the aggregate type may be 
different from the type
-of elements that are added to the state. The interface is similar to 
`ListState` but elements
-added using `add(T)` are folded into an aggregate using a specified 
`FoldFunction`.
-
 * `MapState<UK, UV>`: This keeps a list of mappings. You can put key-value 
pairs into the state and
 retrieve an `Iterable` over all currently stored mappings. Mappings are added 
using `put(UK, UV)` or
 `putAll(Map<UK, UV>)`. The value associated with a user key can be retrieved 
using `get(UK)`. The iterable
@@ -129,8 +124,6 @@ You can also use `isEmpty()` to check whether this map 
contains any key-value ma
 All types of state also have a method `clear()` that clears the state for the 
currently
 active key, i.e. the key of the input element.
 
-<span class="label label-danger">Attention</span> `FoldingState` and 
`FoldingStateDescriptor` have been deprecated in Flink 1.4 and will be 
completely removed in the future. Please use `AggregatingState` and 
`AggregatingStateDescriptor` instead.
-
 It is important to keep in mind that these state objects are only used for 
interfacing
 with state. The state is not necessarily stored inside but might reside on 
disk or somewhere else.
 The second thing to keep in mind is that the value you get from the state
@@ -142,7 +135,7 @@ To get a state handle, you have to create a 
`StateDescriptor`. This holds the na
 that you can reference them), the type of the values that the state holds, and 
possibly
 a user-specified function, such as a `ReduceFunction`. Depending on what type 
of state you
 want to retrieve, you create either a `ValueStateDescriptor`, a 
`ListStateDescriptor`,
-a `ReducingStateDescriptor`, a `FoldingStateDescriptor` or a 
`MapStateDescriptor`.
+a `ReducingStateDescriptor`, or a `MapStateDescriptor`.
 
 State is accessed using the `RuntimeContext`, so it is only possible in *rich 
functions*.
 Please see [here]({% link dev/user_defined_functions.md %}#rich-functions) for
@@ -153,7 +146,6 @@ is available in a `RichFunction` has these methods for 
accessing state:
 * `ReducingState<T> getReducingState(ReducingStateDescriptor<T>)`
 * `ListState<T> getListState(ListStateDescriptor<T>)`
 * `AggregatingState<IN, OUT> 
getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)`
-* `FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)`
 * `MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)`
 
 This is an example `FlatMapFunction` that shows how all of the parts fit 
together:
diff --git a/docs/dev/stream/state/state.zh.md 
b/docs/dev/stream/state/state.zh.md
index 5fe4825..16e5e96 100644
--- a/docs/dev/stream/state/state.zh.md
+++ b/docs/dev/stream/state/state.zh.md
@@ -107,23 +107,17 @@ keyed state 接口提供不同类型状态的访问接口,这些状态都作
 * `AggregatingState<IN, OUT>`: 保留一个单值,表示添加到状态的所有值的聚合。和 `ReducingState` 相反的是, 
聚合类型可能与 添加到状态的元素的类型不同。
 接口与 `ListState` 类似,但使用 `add(IN)` 添加的元素会用指定的 `AggregateFunction` 进行聚合。
 
-* `FoldingState<T, ACC>`: 保留一个单值,表示添加到状态的所有值的聚合。 与 `ReducingState` 
相反,聚合类型可能与添加到状态的元素类型不同。 
-接口与 `ListState` 类似,但使用`add(T)`添加的元素会用指定的 `FoldFunction` 折叠成聚合值。
-
 * `MapState<UK, UV>`: 维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用 
`put(UK,UV)` 或者 `putAll(Map<UK,UV>)` 添加映射。
  使用 `get(UK)` 检索特定 key。 使用 `entries()`,`keys()` 和 `values()` 
分别检索映射、键和值的可迭代视图。你还可以通过 `isEmpty()` 来判断是否包含任何键值对。
 
 所有类型的状态还有一个`clear()` 方法,清除当前 key 下的状态数据,也就是当前输入元素的 key。
 
-<span class="label label-danger">注意</span> `FoldingState` 和 
`FoldingStateDescriptor` 从 Flink 1.4 开始就已经被启用,将会在未来被删除。
-作为替代请使用 `AggregatingState` 和 `AggregatingStateDescriptor`。
-
 请牢记,这些状态对象仅用于与状态交互。状态本身不一定存储在内存中,还可能在磁盘或其他位置。
 另外需要牢记的是从状态中获取的值取决于输入元素所代表的 key。 因此,在不同 key 上调用同一个接口,可能得到不同的值。
 
 你必须创建一个 `StateDescriptor`,才能得到对应的状态句柄。 
这保存了状态名称(正如我们稍后将看到的,你可以创建多个状态,并且它们必须具有唯一的名称以便可以引用它们),
 状态所持有值的类型,并且可能包含用户指定的函数,例如`ReduceFunction`。 
根据不同的状态类型,可以创建`ValueStateDescriptor`,`ListStateDescriptor`,
-`ReducingStateDescriptor`,`FoldingStateDescriptor` 或 `MapStateDescriptor`。
+`ReducingStateDescriptor` 或 `MapStateDescriptor`。
 
 状态通过 `RuntimeContext` 进行访问,因此只能在 *rich functions* 中使用。请参阅[这里]({% link 
dev/user_defined_functions.zh.md %}#rich-functions)获取相关信息,
 但是我们很快也会看到一个例子。`RichFunction` 中 `RuntimeContext` 提供如下方法:
@@ -132,7 +126,6 @@ keyed state 接口提供不同类型状态的访问接口,这些状态都作
 * `ReducingState<T> getReducingState(ReducingStateDescriptor<T>)`
 * `ListState<T> getListState(ListStateDescriptor<T>)`
 * `AggregatingState<IN, OUT> 
getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)`
-* `FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)`
 * `MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)`
 
 下面是一个 `FlatMapFunction` 的例子,展示了如何将这些部分组合起来:
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index 23de3cc..2501109 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -1389,19 +1389,6 @@ public class FlinkKafkaConsumerBaseTest extends 
TestLogger {
                }
 
                @Override
-               public <T extends Serializable> ListState<T> 
getSerializableListState(String stateName) throws Exception {
-                       // return empty state for the legacy 1.2 Kafka consumer 
state
-                       return new TestingListState<>();
-               }
-
-               // 
------------------------------------------------------------------------
-
-               @Override
-               public <S> ListState<S> getOperatorState(ListStateDescriptor<S> 
stateDescriptor) throws Exception {
-                       throw new UnsupportedOperationException();
-               }
-
-               @Override
                public <K, V> BroadcastState<K, V> 
getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) throws Exception {
                        throw new UnsupportedOperationException();
                }
diff --git 
a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
 
b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
index b53723c..bb9ccf8 100644
--- 
a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
+++ 
b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.rabbitmq;
 
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -97,7 +98,7 @@ public class RMQSourceTest {
                OperatorStateStore mockStore = 
Mockito.mock(OperatorStateStore.class);
                FunctionInitializationContext mockContext = 
Mockito.mock(FunctionInitializationContext.class);
                
Mockito.when(mockContext.getOperatorStateStore()).thenReturn(mockStore);
-               
Mockito.when(mockStore.getSerializableListState(any(String.class))).thenReturn(null);
+               
Mockito.when(mockStore.getListState(any(ListStateDescriptor.class))).thenReturn(null);
                return mockContext;
        }
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index 18c6e5a..89e4bc1 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -30,8 +30,6 @@ import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
 import org.apache.flink.api.common.state.AggregatingState;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.api.common.state.FoldingState;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapState;
@@ -415,50 +413,6 @@ public interface RuntimeContext {
        <IN, ACC, OUT> AggregatingState<IN, OUT> 
getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties);
 
        /**
-        * Gets a handle to the system's key/value folding state. This state is 
similar to the state
-        * accessed via {@link #getState(ValueStateDescriptor)}, but is 
optimized for state that
-        * aggregates values with different types.
-        *
-        * <p>This state is only accessible if the function is executed on a 
KeyedStream.
-        *
-        * <pre>{@code
-        * DataStream<MyType> stream = ...;
-        * KeyedStream<MyType> keyedStream = stream.keyBy("id");
-        *
-        * keyedStream.map(new RichMapFunction<MyType, List<MyType>>() {
-        *
-        *     private FoldingState<MyType, Long> state;
-        *
-        *     public void open(Configuration cfg) {
-        *         state = getRuntimeContext().getFoldingState(
-        *                 new FoldingStateDescriptor<>("sum", 0L, (a, b) -> 
a.count() + b, Long.class));
-        *     }
-        *
-        *     public Tuple2<MyType, Long> map(MyType value) {
-        *         state.add(value);
-        *         return new Tuple2<>(value, state.get());
-        *     }
-        * });
-        *
-        * }</pre>
-        *
-        * @param stateProperties The descriptor defining the properties of the 
stats.
-        *
-        * @param <T> Type of the values folded in the other state
-        * @param <ACC> Type of the value in the state
-        *
-        * @return The partitioned state object.
-        *
-        * @throws UnsupportedOperationException Thrown, if no partitioned 
state is available for the
-        *                                       function (function is not part 
of a KeyedStream).
-        *
-        * @deprecated will be removed in a future version in favor of {@link 
AggregatingState}
-        */
-       @PublicEvolving
-       @Deprecated
-       <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, 
ACC> stateProperties);
-
-       /**
         * Gets a handle to the system's key/value map state. This state is 
similar to the state
         * accessed via {@link #getState(ValueStateDescriptor)}, but is 
optimized for state that
         * is composed of user-defined key-value pairs
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
index f63c45e..f73b17f 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
@@ -33,8 +33,6 @@ import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.state.AggregatingState;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.api.common.state.FoldingState;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapState;
@@ -228,14 +226,6 @@ public abstract class AbstractRuntimeUDFContext implements 
RuntimeContext {
 
        @Override
        @PublicEvolving
-       @Deprecated
-       public <T, ACC> FoldingState<T, ACC> 
getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
-               throw new UnsupportedOperationException(
-                               "This state is only accessible by functions 
executed on a KeyedStream");
-       }
-
-       @Override
-       @PublicEvolving
        public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> 
stateProperties) {
                throw new UnsupportedOperationException(
                                "This state is only accessible by functions 
executed on a KeyedStream");
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
index 7a998e6..aadbb23 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
@@ -20,7 +20,6 @@ package org.apache.flink.api.common.state;
 
 import org.apache.flink.annotation.PublicEvolving;
 
-import java.io.Serializable;
 import java.util.Set;
 
 /**
@@ -111,41 +110,4 @@ public interface OperatorStateStore {
         * @return set of names for all registered broadcast states.
         */
        Set<String> getRegisteredBroadcastStateNames();
-
-       // 
-------------------------------------------------------------------------------------------
-       //  Deprecated methods
-       // 
-------------------------------------------------------------------------------------------
-
-       /**
-        * Creates (or restores) a list state. Each state is registered under a 
unique name.
-        * The provided serializer is used to de/serialize the state in case of 
checkpointing (snapshot/restore).
-        *
-        * <p>The items in the list are repartitionable by the system in case 
of changed operator parallelism.
-        *
-        * @param stateDescriptor The descriptor for this state, providing a 
name and serializer.
-        * @param <S> The generic type of the state
-        *
-        * @return A list for all state partitions.
-        *
-        * @deprecated since 1.3.0. This was deprecated as part of a refinement 
to the function names.
-        *             Please use {@link #getListState(ListStateDescriptor)} 
instead.
-        */
-       @Deprecated
-       <S> ListState<S> getOperatorState(ListStateDescriptor<S> 
stateDescriptor) throws Exception;
-
-       /**
-        * Creates a state of the given name that uses Java serialization to 
persist the state. The items in the list
-        * are repartitionable by the system in case of changed operator 
parallelism.
-        *
-        * <p>This is a simple convenience method. For more flexibility on how 
state serialization
-        * should happen, use the {@link #getListState(ListStateDescriptor)} 
method.
-        *
-        * @param stateName The name of state to create
-        * @return A list state using Java serialization to serialize state 
objects.
-        *
-        * @deprecated since 1.3.0. Using Java serialization for persisting 
state is not encouraged.
-        *             Please use {@link #getListState(ListStateDescriptor)} 
instead.
-        */
-       @Deprecated
-       <T extends Serializable> ListState<T> getSerializableListState(String 
stateName) throws Exception;
 }
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepRuntimeContext.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepRuntimeContext.java
index af7ca0c..6c6ccc3 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepRuntimeContext.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepRuntimeContext.java
@@ -31,8 +31,6 @@ import 
org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.state.AggregatingState;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.api.common.state.FoldingState;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapState;
@@ -202,11 +200,6 @@ class CepRuntimeContext implements RuntimeContext {
        }
 
        @Override
-       public <T, ACC> FoldingState<T, ACC> getFoldingState(final 
FoldingStateDescriptor<T, ACC> stateProperties) {
-               throw new UnsupportedOperationException("State is not 
supported.");
-       }
-
-       @Override
        public <UK, UV> MapState<UK, UV> getMapState(final 
MapStateDescriptor<UK, UV> stateProperties) {
                throw new UnsupportedOperationException("State is not 
supported.");
        }
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepRuntimeContextTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepRuntimeContextTest.java
index b77dd94..c45f8a0 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepRuntimeContextTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepRuntimeContextTest.java
@@ -23,11 +23,9 @@ import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
-import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
@@ -179,17 +177,6 @@ public class CepRuntimeContextTest extends TestLogger {
                }
 
                try {
-                       runtimeContext.getFoldingState(new 
FoldingStateDescriptor<>(
-                               "foobar",
-                               0,
-                               mock(FoldFunction.class),
-                               Integer.class));
-                       fail("Expected getFoldingState to fail with unsupported 
operation exception.");
-               } catch (UnsupportedOperationException e) {
-                       // expected
-               }
-
-               try {
                        runtimeContext.getMapState(new 
MapStateDescriptor<>("foobar", Integer.class, String.class));
                        fail("Expected getMapState to fail with unsupported 
operation exception.");
                } catch (UnsupportedOperationException e) {
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointRuntimeContext.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointRuntimeContext.java
index dc25468..fb16a77 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointRuntimeContext.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointRuntimeContext.java
@@ -31,8 +31,6 @@ import 
org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.state.AggregatingState;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.api.common.state.FoldingState;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -230,17 +228,6 @@ public final class SavepointRuntimeContext implements 
RuntimeContext {
        }
 
        @Override
-       @Deprecated
-       public <T, ACC> FoldingState<T, ACC> 
getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
-               if (!stateRegistrationAllowed) {
-                       throw new RuntimeException(REGISTRATION_EXCEPTION_MSG);
-               }
-
-               registeredDescriptors.add(stateProperties);
-               return keyedStateStore.getFoldingState(stateProperties);
-       }
-
-       @Override
        public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> 
stateProperties) {
                if (!stateRegistrationAllowed) {
                        throw new RuntimeException(REGISTRATION_EXCEPTION_MSG);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index 48c8eb8..2e32f16 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -220,31 +220,6 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
        }
 
        // 
-------------------------------------------------------------------------------------------
-       //  Deprecated state access methods
-       // 
-------------------------------------------------------------------------------------------
-
-       /**
-        * @deprecated This was deprecated as part of a refinement to the 
function names.
-        *             Please use {@link #getListState(ListStateDescriptor)} 
instead.
-        */
-       @Deprecated
-       @Override
-       public <S> ListState<S> getOperatorState(ListStateDescriptor<S> 
stateDescriptor) throws Exception {
-               return getListState(stateDescriptor);
-       }
-
-       /**
-        * @deprecated Using Java serialization for persisting state is not 
encouraged.
-        *             Please use {@link #getListState(ListStateDescriptor)} 
instead.
-        */
-       @SuppressWarnings("unchecked")
-       @Deprecated
-       @Override
-       public <T extends Serializable> ListState<T> 
getSerializableListState(String stateName) throws Exception {
-               return (ListState<T>) getListState(new 
ListStateDescriptor<>(stateName, deprecatedDefaultJavaSerializer));
-       }
-
-       // 
-------------------------------------------------------------------------------------------
        //  Snapshot
        // 
-------------------------------------------------------------------------------------------
        @Nonnull
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index 9ea2f98..981d6f5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -787,7 +787,7 @@ public class OperatorStateBackendTest {
                ListStateDescriptor<MutableType> stateDescriptor1 =
                                new ListStateDescriptor<>("test1", new 
JavaSerializer<MutableType>());
 
-               ListState<MutableType> listState1 = 
operatorStateBackend.getOperatorState(stateDescriptor1);
+               ListState<MutableType> listState1 = 
operatorStateBackend.getListState(stateDescriptor1);
 
                listState1.add(MutableType.of(42));
                listState1.add(MutableType.of(4711));
@@ -841,7 +841,7 @@ public class OperatorStateBackendTest {
                ListStateDescriptor<MutableType> stateDescriptor1 =
                                new ListStateDescriptor<>("test1", new 
JavaSerializer<MutableType>());
 
-               ListState<MutableType> listState1 = 
operatorStateBackend.getOperatorState(stateDescriptor1);
+               ListState<MutableType> listState1 = 
operatorStateBackend.getListState(stateDescriptor1);
 
                listState1.add(MutableType.of(42));
                listState1.add(MutableType.of(4711));
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
index 7b4ac6d..df379578 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
@@ -35,8 +35,6 @@ import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.state.AggregatingState;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.api.common.state.FoldingState;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapState;
@@ -186,11 +184,6 @@ public abstract class RichAsyncFunction<IN, OUT> extends 
AbstractRichFunction im
                }
 
                @Override
-               public <T, ACC> FoldingState<T, ACC> 
getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
-                       throw new UnsupportedOperationException("State is not 
supported in rich async functions.");
-               }
-
-               @Override
                public <UK, UV> MapState<UK, UV> 
getMapState(MapStateDescriptor<UK, UV> stateProperties) {
                        throw new UnsupportedOperationException("State is not 
supported in rich async functions.");
                }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
index e451563..8f76a75 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
@@ -26,8 +26,6 @@ import 
org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext;
 import org.apache.flink.api.common.state.AggregatingState;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.api.common.state.FoldingState;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -214,13 +212,6 @@ public class StreamingRuntimeContext extends 
AbstractRuntimeUDFContext {
        }
 
        @Override
-       public <T, ACC> FoldingState<T, ACC> 
getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
-               KeyedStateStore keyedStateStore = 
checkPreconditionsAndGetKeyedStateStore(stateProperties);
-               
stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
-               return keyedStateStore.getFoldingState(stateProperties);
-       }
-
-       @Override
        public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> 
stateProperties) {
                KeyedStateStore keyedStateStore = 
checkPreconditionsAndGetKeyedStateStore(stateProperties);
                
stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
index 2618e53..45c535d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
@@ -22,12 +22,10 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
-import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
@@ -197,17 +195,6 @@ public class RichAsyncFunctionTest {
                }
 
                try {
-                       runtimeContext.getFoldingState(new 
FoldingStateDescriptor<>("foobar", 0, new FoldFunction<Integer, Integer>() {
-                               @Override
-                               public Integer fold(Integer accumulator, 
Integer value) throws Exception {
-                                       return accumulator;
-                               }
-                       }, Integer.class));
-               } catch (UnsupportedOperationException e) {
-                       // expected
-               }
-
-               try {
                        runtimeContext.getMapState(new 
MapStateDescriptor<>("foobar", Integer.class, String.class));
                } catch (UnsupportedOperationException e) {
                        // expected
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
index fff4ef3..8ed41fc 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
@@ -22,10 +22,8 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.functions.AggregateFunction;
-import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapState;
@@ -158,32 +156,6 @@ public class StreamingRuntimeContextTest {
        }
 
        @Test
-       public void testFoldingStateInstantiation() throws Exception {
-
-               final ExecutionConfig config = new ExecutionConfig();
-               config.registerKryoType(Path.class);
-
-               final AtomicReference<Object> descriptorCapture = new 
AtomicReference<>();
-
-               StreamingRuntimeContext context = 
createRuntimeContext(descriptorCapture, config);
-
-               @SuppressWarnings("unchecked")
-               FoldFunction<String, TaskInfo> folder = (FoldFunction<String, 
TaskInfo>) mock(FoldFunction.class);
-
-               FoldingStateDescriptor<String, TaskInfo> descr =
-                               new FoldingStateDescriptor<>("name", null, 
folder, TaskInfo.class);
-
-               context.getFoldingState(descr);
-
-               FoldingStateDescriptor<?, ?> descrIntercepted = 
(FoldingStateDescriptor<?, ?>) descriptorCapture.get();
-               TypeSerializer<?> serializer = descrIntercepted.getSerializer();
-
-               // check that the Path class is really registered, i.e., the 
execution config was applied
-               assertTrue(serializer instanceof KryoSerializer);
-               assertTrue(((KryoSerializer<?>) 
serializer).getKryo().getRegistration(Path.class).getId() > 0);
-       }
-
-       @Test
        public void testListStateInstantiation() throws Exception {
 
                final ExecutionConfig config = new ExecutionConfig();
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/MockOperatorStateStore.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/MockOperatorStateStore.java
index 381eb80b..319f5c1 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/MockOperatorStateStore.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/MockOperatorStateStore.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.streaming.api.functions.sink.filesystem.TestUtils;
 
-import java.io.Serializable;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -73,16 +72,6 @@ public class MockOperatorStateStore implements 
OperatorStateStore {
                throw new UnsupportedOperationException();
        }
 
-       @Override
-       public <S> ListState<S> getOperatorState(ListStateDescriptor<S> 
stateDescriptor) throws Exception {
-               return getListState(stateDescriptor);
-       }
-
-       @Override
-       public <T extends Serializable> ListState<T> 
getSerializableListState(String stateName) throws Exception {
-               throw new UnsupportedOperationException();
-       }
-
        public void checkpointBegin(long checkpointId) {
                Map<String, TestUtils.MockListState> copiedStates = 
Collections.unmodifiableMap(copyStates(currentStateMap));
                historyStateMap.put(checkpointId, copiedStates);
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
index 8c78b7a..d8ac9a7 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
@@ -198,7 +198,7 @@ class StatefulJobSavepointMigrationITCase(
     }
 
     override def initializeState(context: FunctionInitializationContext): Unit 
= {
-      state = context.getOperatorStateStore.getOperatorState(
+      state = context.getOperatorStateStore.getListState(
         new ListStateDescriptor[CustomCaseClass](
           "sourceState", createTypeInformation[CustomCaseClass]))
     }
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala
index 4acfa78..5ce76c4 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala
@@ -30,8 +30,8 @@ import org.apache.flink.api.scala.createTypeInformation
 import org.apache.flink.api.scala.migration.CustomEnum.CustomEnum
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
-import org.apache.flink.runtime.state.{FunctionInitializationContext, 
FunctionSnapshotContext, StateBackendLoader}
 import org.apache.flink.runtime.state.memory.MemoryStateBackend
+import org.apache.flink.runtime.state.{FunctionInitializationContext, 
FunctionSnapshotContext, StateBackendLoader}
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
@@ -46,7 +46,7 @@ import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import org.junit.{Assert, Ignore, Test}
 
-import scala.util.{Failure, Properties, Try}
+import scala.util.{Failure, Try}
 
 object StatefulJobWBroadcastStateMigrationITCase {
 
@@ -290,7 +290,7 @@ private class CheckpointedSource(val numElements: Int)
   }
 
   override def initializeState(context: FunctionInitializationContext): Unit = 
{
-    state = context.getOperatorStateStore.getOperatorState(
+    state = context.getOperatorStateStore.getListState(
       new ListStateDescriptor[CustomCaseClass](
         "sourceState", createTypeInformation[CustomCaseClass]))
   }

Reply via email to