This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 276332e11277e7e50789e4eaed0f11ae59a0f8d1 Author: Ghildiyal <manish.ghildi...@here.com> AuthorDate: Sat May 16 16:48:38 2020 +0530 [FLINK-17376] Remove deprecated state access methods --- docs/dev/stream/state/state.md | 10 +---- docs/dev/stream/state/state.zh.md | 9 +---- .../kafka/FlinkKafkaConsumerBaseTest.java | 13 ------ .../connectors/rabbitmq/RMQSourceTest.java | 3 +- .../flink/api/common/functions/RuntimeContext.java | 46 ---------------------- .../functions/util/AbstractRuntimeUDFContext.java | 10 ----- .../flink/api/common/state/OperatorStateStore.java | 38 ------------------ .../flink/cep/operator/CepRuntimeContext.java | 7 ---- .../flink/cep/operator/CepRuntimeContextTest.java | 13 ------ .../state/api/runtime/SavepointRuntimeContext.java | 13 ------ .../runtime/state/DefaultOperatorStateBackend.java | 25 ------------ .../runtime/state/OperatorStateBackendTest.java | 4 +- .../api/functions/async/RichAsyncFunction.java | 7 ---- .../api/operators/StreamingRuntimeContext.java | 9 ----- .../api/functions/async/RichAsyncFunctionTest.java | 13 ------ .../api/operators/StreamingRuntimeContextTest.java | 28 ------------- .../collect/utils/MockOperatorStateStore.java | 11 ------ .../StatefulJobSavepointMigrationITCase.scala | 2 +- ...StatefulJobWBroadcastStateMigrationITCase.scala | 6 +-- 19 files changed, 10 insertions(+), 257 deletions(-) diff --git a/docs/dev/stream/state/state.md b/docs/dev/stream/state/state.md index 728beab..389a03e 100644 --- a/docs/dev/stream/state/state.md +++ b/docs/dev/stream/state/state.md @@ -115,11 +115,6 @@ added to the state. Contrary to `ReducingState`, the aggregate type may be diffe of elements that are added to the state. The interface is the same as for `ListState` but elements added using `add(IN)` are aggregated using a specified `AggregateFunction`. -* `FoldingState<T, ACC>`: This keeps a single value that represents the aggregation of all values -added to the state. Contrary to `ReducingState`, the aggregate type may be different from the type -of elements that are added to the state. The interface is similar to `ListState` but elements -added using `add(T)` are folded into an aggregate using a specified `FoldFunction`. - * `MapState<UK, UV>`: This keeps a list of mappings. You can put key-value pairs into the state and retrieve an `Iterable` over all currently stored mappings. Mappings are added using `put(UK, UV)` or `putAll(Map<UK, UV>)`. The value associated with a user key can be retrieved using `get(UK)`. The iterable @@ -129,8 +124,6 @@ You can also use `isEmpty()` to check whether this map contains any key-value ma All types of state also have a method `clear()` that clears the state for the currently active key, i.e. the key of the input element. -<span class="label label-danger">Attention</span> `FoldingState` and `FoldingStateDescriptor` have been deprecated in Flink 1.4 and will be completely removed in the future. Please use `AggregatingState` and `AggregatingStateDescriptor` instead. - It is important to keep in mind that these state objects are only used for interfacing with state. The state is not necessarily stored inside but might reside on disk or somewhere else. The second thing to keep in mind is that the value you get from the state @@ -142,7 +135,7 @@ To get a state handle, you have to create a `StateDescriptor`. This holds the na that you can reference them), the type of the values that the state holds, and possibly a user-specified function, such as a `ReduceFunction`. Depending on what type of state you want to retrieve, you create either a `ValueStateDescriptor`, a `ListStateDescriptor`, -a `ReducingStateDescriptor`, a `FoldingStateDescriptor` or a `MapStateDescriptor`. +a `ReducingStateDescriptor`, or a `MapStateDescriptor`. State is accessed using the `RuntimeContext`, so it is only possible in *rich functions*. Please see [here]({% link dev/user_defined_functions.md %}#rich-functions) for @@ -153,7 +146,6 @@ is available in a `RichFunction` has these methods for accessing state: * `ReducingState<T> getReducingState(ReducingStateDescriptor<T>)` * `ListState<T> getListState(ListStateDescriptor<T>)` * `AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)` -* `FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)` * `MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)` This is an example `FlatMapFunction` that shows how all of the parts fit together: diff --git a/docs/dev/stream/state/state.zh.md b/docs/dev/stream/state/state.zh.md index 5fe4825..16e5e96 100644 --- a/docs/dev/stream/state/state.zh.md +++ b/docs/dev/stream/state/state.zh.md @@ -107,23 +107,17 @@ keyed state 接口提供不同类型状态的访问接口,这些状态都作 * `AggregatingState<IN, OUT>`: 保留一个单值,表示添加到状态的所有值的聚合。和 `ReducingState` 相反的是, 聚合类型可能与 添加到状态的元素的类型不同。 接口与 `ListState` 类似,但使用 `add(IN)` 添加的元素会用指定的 `AggregateFunction` 进行聚合。 -* `FoldingState<T, ACC>`: 保留一个单值,表示添加到状态的所有值的聚合。 与 `ReducingState` 相反,聚合类型可能与添加到状态的元素类型不同。 -接口与 `ListState` 类似,但使用`add(T)`添加的元素会用指定的 `FoldFunction` 折叠成聚合值。 - * `MapState<UK, UV>`: 维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用 `put(UK,UV)` 或者 `putAll(Map<UK,UV>)` 添加映射。 使用 `get(UK)` 检索特定 key。 使用 `entries()`,`keys()` 和 `values()` 分别检索映射、键和值的可迭代视图。你还可以通过 `isEmpty()` 来判断是否包含任何键值对。 所有类型的状态还有一个`clear()` 方法,清除当前 key 下的状态数据,也就是当前输入元素的 key。 -<span class="label label-danger">注意</span> `FoldingState` 和 `FoldingStateDescriptor` 从 Flink 1.4 开始就已经被启用,将会在未来被删除。 -作为替代请使用 `AggregatingState` 和 `AggregatingStateDescriptor`。 - 请牢记,这些状态对象仅用于与状态交互。状态本身不一定存储在内存中,还可能在磁盘或其他位置。 另外需要牢记的是从状态中获取的值取决于输入元素所代表的 key。 因此,在不同 key 上调用同一个接口,可能得到不同的值。 你必须创建一个 `StateDescriptor`,才能得到对应的状态句柄。 这保存了状态名称(正如我们稍后将看到的,你可以创建多个状态,并且它们必须具有唯一的名称以便可以引用它们), 状态所持有值的类型,并且可能包含用户指定的函数,例如`ReduceFunction`。 根据不同的状态类型,可以创建`ValueStateDescriptor`,`ListStateDescriptor`, -`ReducingStateDescriptor`,`FoldingStateDescriptor` 或 `MapStateDescriptor`。 +`ReducingStateDescriptor` 或 `MapStateDescriptor`。 状态通过 `RuntimeContext` 进行访问,因此只能在 *rich functions* 中使用。请参阅[这里]({% link dev/user_defined_functions.zh.md %}#rich-functions)获取相关信息, 但是我们很快也会看到一个例子。`RichFunction` 中 `RuntimeContext` 提供如下方法: @@ -132,7 +126,6 @@ keyed state 接口提供不同类型状态的访问接口,这些状态都作 * `ReducingState<T> getReducingState(ReducingStateDescriptor<T>)` * `ListState<T> getListState(ListStateDescriptor<T>)` * `AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)` -* `FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)` * `MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)` 下面是一个 `FlatMapFunction` 的例子,展示了如何将这些部分组合起来: diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java index 23de3cc..2501109 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java @@ -1389,19 +1389,6 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger { } @Override - public <T extends Serializable> ListState<T> getSerializableListState(String stateName) throws Exception { - // return empty state for the legacy 1.2 Kafka consumer state - return new TestingListState<>(); - } - - // ------------------------------------------------------------------------ - - @Override - public <S> ListState<S> getOperatorState(ListStateDescriptor<S> stateDescriptor) throws Exception { - throw new UnsupportedOperationException(); - } - - @Override public <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) throws Exception { throw new UnsupportedOperationException(); } diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java index b53723c..bb9ccf8 100644 --- a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java +++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.rabbitmq; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; @@ -97,7 +98,7 @@ public class RMQSourceTest { OperatorStateStore mockStore = Mockito.mock(OperatorStateStore.class); FunctionInitializationContext mockContext = Mockito.mock(FunctionInitializationContext.class); Mockito.when(mockContext.getOperatorStateStore()).thenReturn(mockStore); - Mockito.when(mockStore.getSerializableListState(any(String.class))).thenReturn(null); + Mockito.when(mockStore.getListState(any(ListStateDescriptor.class))).thenReturn(null); return mockContext; } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java index 18c6e5a..89e4bc1 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java @@ -30,8 +30,6 @@ import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.api.common.externalresource.ExternalResourceInfo; import org.apache.flink.api.common.state.AggregatingState; import org.apache.flink.api.common.state.AggregatingStateDescriptor; -import org.apache.flink.api.common.state.FoldingState; -import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.MapState; @@ -415,50 +413,6 @@ public interface RuntimeContext { <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties); /** - * Gets a handle to the system's key/value folding state. This state is similar to the state - * accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that - * aggregates values with different types. - * - * <p>This state is only accessible if the function is executed on a KeyedStream. - * - * <pre>{@code - * DataStream<MyType> stream = ...; - * KeyedStream<MyType> keyedStream = stream.keyBy("id"); - * - * keyedStream.map(new RichMapFunction<MyType, List<MyType>>() { - * - * private FoldingState<MyType, Long> state; - * - * public void open(Configuration cfg) { - * state = getRuntimeContext().getFoldingState( - * new FoldingStateDescriptor<>("sum", 0L, (a, b) -> a.count() + b, Long.class)); - * } - * - * public Tuple2<MyType, Long> map(MyType value) { - * state.add(value); - * return new Tuple2<>(value, state.get()); - * } - * }); - * - * }</pre> - * - * @param stateProperties The descriptor defining the properties of the stats. - * - * @param <T> Type of the values folded in the other state - * @param <ACC> Type of the value in the state - * - * @return The partitioned state object. - * - * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the - * function (function is not part of a KeyedStream). - * - * @deprecated will be removed in a future version in favor of {@link AggregatingState} - */ - @PublicEvolving - @Deprecated - <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties); - - /** * Gets a handle to the system's key/value map state. This state is similar to the state * accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that * is composed of user-defined key-value pairs diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java index f63c45e..f73b17f 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java @@ -33,8 +33,6 @@ import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.state.AggregatingState; import org.apache.flink.api.common.state.AggregatingStateDescriptor; -import org.apache.flink.api.common.state.FoldingState; -import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.MapState; @@ -228,14 +226,6 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext { @Override @PublicEvolving - @Deprecated - public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) { - throw new UnsupportedOperationException( - "This state is only accessible by functions executed on a KeyedStream"); - } - - @Override - @PublicEvolving public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) { throw new UnsupportedOperationException( "This state is only accessible by functions executed on a KeyedStream"); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java index 7a998e6..aadbb23 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java @@ -20,7 +20,6 @@ package org.apache.flink.api.common.state; import org.apache.flink.annotation.PublicEvolving; -import java.io.Serializable; import java.util.Set; /** @@ -111,41 +110,4 @@ public interface OperatorStateStore { * @return set of names for all registered broadcast states. */ Set<String> getRegisteredBroadcastStateNames(); - - // ------------------------------------------------------------------------------------------- - // Deprecated methods - // ------------------------------------------------------------------------------------------- - - /** - * Creates (or restores) a list state. Each state is registered under a unique name. - * The provided serializer is used to de/serialize the state in case of checkpointing (snapshot/restore). - * - * <p>The items in the list are repartitionable by the system in case of changed operator parallelism. - * - * @param stateDescriptor The descriptor for this state, providing a name and serializer. - * @param <S> The generic type of the state - * - * @return A list for all state partitions. - * - * @deprecated since 1.3.0. This was deprecated as part of a refinement to the function names. - * Please use {@link #getListState(ListStateDescriptor)} instead. - */ - @Deprecated - <S> ListState<S> getOperatorState(ListStateDescriptor<S> stateDescriptor) throws Exception; - - /** - * Creates a state of the given name that uses Java serialization to persist the state. The items in the list - * are repartitionable by the system in case of changed operator parallelism. - * - * <p>This is a simple convenience method. For more flexibility on how state serialization - * should happen, use the {@link #getListState(ListStateDescriptor)} method. - * - * @param stateName The name of state to create - * @return A list state using Java serialization to serialize state objects. - * - * @deprecated since 1.3.0. Using Java serialization for persisting state is not encouraged. - * Please use {@link #getListState(ListStateDescriptor)} instead. - */ - @Deprecated - <T extends Serializable> ListState<T> getSerializableListState(String stateName) throws Exception; } diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepRuntimeContext.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepRuntimeContext.java index af7ca0c..6c6ccc3 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepRuntimeContext.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepRuntimeContext.java @@ -31,8 +31,6 @@ import org.apache.flink.api.common.functions.BroadcastVariableInitializer; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.state.AggregatingState; import org.apache.flink.api.common.state.AggregatingStateDescriptor; -import org.apache.flink.api.common.state.FoldingState; -import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.MapState; @@ -202,11 +200,6 @@ class CepRuntimeContext implements RuntimeContext { } @Override - public <T, ACC> FoldingState<T, ACC> getFoldingState(final FoldingStateDescriptor<T, ACC> stateProperties) { - throw new UnsupportedOperationException("State is not supported."); - } - - @Override public <UK, UV> MapState<UK, UV> getMapState(final MapStateDescriptor<UK, UV> stateProperties) { throw new UnsupportedOperationException("State is not supported."); } diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepRuntimeContextTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepRuntimeContextTest.java index b77dd94..c45f8a0 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepRuntimeContextTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepRuntimeContextTest.java @@ -23,11 +23,9 @@ import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.BroadcastVariableInitializer; -import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.state.AggregatingStateDescriptor; -import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReducingStateDescriptor; @@ -179,17 +177,6 @@ public class CepRuntimeContextTest extends TestLogger { } try { - runtimeContext.getFoldingState(new FoldingStateDescriptor<>( - "foobar", - 0, - mock(FoldFunction.class), - Integer.class)); - fail("Expected getFoldingState to fail with unsupported operation exception."); - } catch (UnsupportedOperationException e) { - // expected - } - - try { runtimeContext.getMapState(new MapStateDescriptor<>("foobar", Integer.class, String.class)); fail("Expected getMapState to fail with unsupported operation exception."); } catch (UnsupportedOperationException e) { diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointRuntimeContext.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointRuntimeContext.java index dc25468..fb16a77 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointRuntimeContext.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointRuntimeContext.java @@ -31,8 +31,6 @@ import org.apache.flink.api.common.functions.BroadcastVariableInitializer; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.state.AggregatingState; import org.apache.flink.api.common.state.AggregatingStateDescriptor; -import org.apache.flink.api.common.state.FoldingState; -import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.KeyedStateStore; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; @@ -230,17 +228,6 @@ public final class SavepointRuntimeContext implements RuntimeContext { } @Override - @Deprecated - public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) { - if (!stateRegistrationAllowed) { - throw new RuntimeException(REGISTRATION_EXCEPTION_MSG); - } - - registeredDescriptors.add(stateProperties); - return keyedStateStore.getFoldingState(stateProperties); - } - - @Override public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) { if (!stateRegistrationAllowed) { throw new RuntimeException(REGISTRATION_EXCEPTION_MSG); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java index 48c8eb8..2e32f16 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java @@ -220,31 +220,6 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { } // ------------------------------------------------------------------------------------------- - // Deprecated state access methods - // ------------------------------------------------------------------------------------------- - - /** - * @deprecated This was deprecated as part of a refinement to the function names. - * Please use {@link #getListState(ListStateDescriptor)} instead. - */ - @Deprecated - @Override - public <S> ListState<S> getOperatorState(ListStateDescriptor<S> stateDescriptor) throws Exception { - return getListState(stateDescriptor); - } - - /** - * @deprecated Using Java serialization for persisting state is not encouraged. - * Please use {@link #getListState(ListStateDescriptor)} instead. - */ - @SuppressWarnings("unchecked") - @Deprecated - @Override - public <T extends Serializable> ListState<T> getSerializableListState(String stateName) throws Exception { - return (ListState<T>) getListState(new ListStateDescriptor<>(stateName, deprecatedDefaultJavaSerializer)); - } - - // ------------------------------------------------------------------------------------------- // Snapshot // ------------------------------------------------------------------------------------------- @Nonnull diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java index 9ea2f98..981d6f5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java @@ -787,7 +787,7 @@ public class OperatorStateBackendTest { ListStateDescriptor<MutableType> stateDescriptor1 = new ListStateDescriptor<>("test1", new JavaSerializer<MutableType>()); - ListState<MutableType> listState1 = operatorStateBackend.getOperatorState(stateDescriptor1); + ListState<MutableType> listState1 = operatorStateBackend.getListState(stateDescriptor1); listState1.add(MutableType.of(42)); listState1.add(MutableType.of(4711)); @@ -841,7 +841,7 @@ public class OperatorStateBackendTest { ListStateDescriptor<MutableType> stateDescriptor1 = new ListStateDescriptor<>("test1", new JavaSerializer<MutableType>()); - ListState<MutableType> listState1 = operatorStateBackend.getOperatorState(stateDescriptor1); + ListState<MutableType> listState1 = operatorStateBackend.getListState(stateDescriptor1); listState1.add(MutableType.of(42)); listState1.add(MutableType.of(4711)); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java index 7b4ac6d..df379578 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java @@ -35,8 +35,6 @@ import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.state.AggregatingState; import org.apache.flink.api.common.state.AggregatingStateDescriptor; -import org.apache.flink.api.common.state.FoldingState; -import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.MapState; @@ -186,11 +184,6 @@ public abstract class RichAsyncFunction<IN, OUT> extends AbstractRichFunction im } @Override - public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) { - throw new UnsupportedOperationException("State is not supported in rich async functions."); - } - - @Override public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) { throw new UnsupportedOperationException("State is not supported in rich async functions."); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java index e451563..8f76a75 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java @@ -26,8 +26,6 @@ import org.apache.flink.api.common.functions.BroadcastVariableInitializer; import org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext; import org.apache.flink.api.common.state.AggregatingState; import org.apache.flink.api.common.state.AggregatingStateDescriptor; -import org.apache.flink.api.common.state.FoldingState; -import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.KeyedStateStore; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; @@ -214,13 +212,6 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext { } @Override - public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) { - KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties); - stateProperties.initializeSerializerUnlessSet(getExecutionConfig()); - return keyedStateStore.getFoldingState(stateProperties); - } - - @Override public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) { KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties); stateProperties.initializeSerializerUnlessSet(getExecutionConfig()); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java index 2618e53..45c535d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java @@ -22,12 +22,10 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.BroadcastVariableInitializer; -import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.IterationRuntimeContext; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.state.AggregatingStateDescriptor; -import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReducingStateDescriptor; @@ -197,17 +195,6 @@ public class RichAsyncFunctionTest { } try { - runtimeContext.getFoldingState(new FoldingStateDescriptor<>("foobar", 0, new FoldFunction<Integer, Integer>() { - @Override - public Integer fold(Integer accumulator, Integer value) throws Exception { - return accumulator; - } - }, Integer.class)); - } catch (UnsupportedOperationException e) { - // expected - } - - try { runtimeContext.getMapState(new MapStateDescriptor<>("foobar", Integer.class, String.class)); } catch (UnsupportedOperationException e) { // expected diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java index fff4ef3..8ed41fc 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java @@ -22,10 +22,8 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.functions.AggregateFunction; -import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.state.AggregatingStateDescriptor; -import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.MapState; @@ -158,32 +156,6 @@ public class StreamingRuntimeContextTest { } @Test - public void testFoldingStateInstantiation() throws Exception { - - final ExecutionConfig config = new ExecutionConfig(); - config.registerKryoType(Path.class); - - final AtomicReference<Object> descriptorCapture = new AtomicReference<>(); - - StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config); - - @SuppressWarnings("unchecked") - FoldFunction<String, TaskInfo> folder = (FoldFunction<String, TaskInfo>) mock(FoldFunction.class); - - FoldingStateDescriptor<String, TaskInfo> descr = - new FoldingStateDescriptor<>("name", null, folder, TaskInfo.class); - - context.getFoldingState(descr); - - FoldingStateDescriptor<?, ?> descrIntercepted = (FoldingStateDescriptor<?, ?>) descriptorCapture.get(); - TypeSerializer<?> serializer = descrIntercepted.getSerializer(); - - // check that the Path class is really registered, i.e., the execution config was applied - assertTrue(serializer instanceof KryoSerializer); - assertTrue(((KryoSerializer<?>) serializer).getKryo().getRegistration(Path.class).getId() > 0); - } - - @Test public void testListStateInstantiation() throws Exception { final ExecutionConfig config = new ExecutionConfig(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/MockOperatorStateStore.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/MockOperatorStateStore.java index 381eb80b..319f5c1 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/MockOperatorStateStore.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/MockOperatorStateStore.java @@ -24,7 +24,6 @@ import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.streaming.api.functions.sink.filesystem.TestUtils; -import java.io.Serializable; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -73,16 +72,6 @@ public class MockOperatorStateStore implements OperatorStateStore { throw new UnsupportedOperationException(); } - @Override - public <S> ListState<S> getOperatorState(ListStateDescriptor<S> stateDescriptor) throws Exception { - return getListState(stateDescriptor); - } - - @Override - public <T extends Serializable> ListState<T> getSerializableListState(String stateName) throws Exception { - throw new UnsupportedOperationException(); - } - public void checkpointBegin(long checkpointId) { Map<String, TestUtils.MockListState> copiedStates = Collections.unmodifiableMap(copyStates(currentStateMap)); historyStateMap.put(checkpointId, copiedStates); diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala index 8c78b7a..d8ac9a7 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala @@ -198,7 +198,7 @@ class StatefulJobSavepointMigrationITCase( } override def initializeState(context: FunctionInitializationContext): Unit = { - state = context.getOperatorStateStore.getOperatorState( + state = context.getOperatorStateStore.getListState( new ListStateDescriptor[CustomCaseClass]( "sourceState", createTypeInformation[CustomCaseClass])) } diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala index 4acfa78..5ce76c4 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala @@ -30,8 +30,8 @@ import org.apache.flink.api.scala.createTypeInformation import org.apache.flink.api.scala.migration.CustomEnum.CustomEnum import org.apache.flink.configuration.Configuration import org.apache.flink.contrib.streaming.state.RocksDBStateBackend -import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext, StateBackendLoader} import org.apache.flink.runtime.state.memory.MemoryStateBackend +import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext, StateBackendLoader} import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment @@ -46,7 +46,7 @@ import org.junit.runner.RunWith import org.junit.runners.Parameterized import org.junit.{Assert, Ignore, Test} -import scala.util.{Failure, Properties, Try} +import scala.util.{Failure, Try} object StatefulJobWBroadcastStateMigrationITCase { @@ -290,7 +290,7 @@ private class CheckpointedSource(val numElements: Int) } override def initializeState(context: FunctionInitializationContext): Unit = { - state = context.getOperatorStateStore.getOperatorState( + state = context.getOperatorStateStore.getListState( new ListStateDescriptor[CustomCaseClass]( "sourceState", createTypeInformation[CustomCaseClass])) }