This is an automated email from the ASF dual-hosted git repository. zakelly pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4fce102f8448777f7c5eb1b7859de5cd1162621c Author: Zakelly <zakelly....@gmail.com> AuthorDate: Wed Jan 8 18:16:06 2025 +0800 [FLINK-37045][Runtime] Move state descriptor v2 to flink-core --- .../state/v2/AggregatingStateDescriptor.java | 4 +- .../api/common}/state/v2/ListStateDescriptor.java | 5 +- .../api/common}/state/v2/MapStateDescriptor.java | 5 +- .../common}/state/v2/ReducingStateDescriptor.java | 4 +- .../api/common}/state/v2/StateDescriptor.java | 7 +- .../common}/state/v2/StateSerializerReference.java | 6 +- .../api/common}/state/v2/ValueStateDescriptor.java | 5 +- .../state/v2/AggregatingStateDescriptorTest.java | 2 +- .../common}/state/v2/ListStateDescriptorTest.java | 2 +- .../common}/state/v2/MapStateDescriptorTest.java | 2 +- .../state/v2/ReducingStateDescriptorTest.java | 2 +- .../api/common}/state/v2/StateDescriptorTest.java | 2 +- .../common}/state/v2/ValueStateDescriptorTest.java | 2 +- .../impl/context/DefaultStateManager.java | 10 +-- .../AbstractAsyncStateStreamOperator.java | 2 +- .../AbstractAsyncStateStreamOperatorV2.java | 2 +- .../runtime/state/AsyncKeyedStateBackend.java | 2 +- .../runtime/state/DefaultOperatorStateBackend.java | 6 +- .../runtime/state/v2/AbstractAggregatingState.java | 1 + .../flink/runtime/state/v2/AbstractKeyedState.java | 1 + .../flink/runtime/state/v2/AbstractListState.java | 1 + .../flink/runtime/state/v2/AbstractMapState.java | 1 + .../runtime/state/v2/AbstractReducingState.java | 1 + .../flink/runtime/state/v2/AbstractValueState.java | 1 + .../state/v2/AggregatingStateDescriptor.java | 48 ++---------- .../runtime/state/v2/DefaultKeyedStateStore.java | 5 ++ .../flink/runtime/state/v2/KeyedStateStore.java | 5 ++ .../runtime/state/v2/ListStateDescriptor.java | 29 ++----- .../flink/runtime/state/v2/MapStateDescriptor.java | 89 ++-------------------- .../flink/runtime/state/v2/OperatorStateStore.java | 2 + .../runtime/state/v2/ReducingStateDescriptor.java | 43 ++--------- .../v2/RegisteredKeyValueStateBackendMetaInfo.java | 1 + .../runtime/state/v2/StateDescriptorUtils.java | 7 ++ .../runtime/state/v2/ValueStateDescriptor.java | 29 ++----- .../v2/adaptor/AsyncKeyedStateBackendAdaptor.java | 2 +- .../runtime/state/v2/ttl/TtlStateFactory.java | 12 +-- .../StreamGroupedReduceAsyncStateOperator.java | 2 +- .../api/operators/StreamOperatorStateHandler.java | 2 +- .../api/operators/StreamingRuntimeContext.java | 12 +-- .../AsyncExecutionControllerTest.java | 2 +- .../flink/runtime/state/StateBackendTestUtils.java | 4 +- .../state/v2/AbstractAggregatingStateTest.java | 1 + .../state/v2/AbstractKeyedStateTestBase.java | 1 + .../runtime/state/v2/AbstractListStateTest.java | 1 + .../runtime/state/v2/AbstractMapStateTest.java | 1 + .../state/v2/AbstractReducingStateTest.java | 1 + .../runtime/state/v2/AbstractValueStateTest.java | 1 + .../v2/AsyncKeyedStateBackendAdaptorTest.java | 6 ++ .../runtime/state/v2/StateBackendTestV2Base.java | 1 + .../api/operators/StreamingRuntimeContextTest.java | 44 ++++++----- .../flink/state/forst/ForStAggregatingState.java | 2 +- .../flink/state/forst/ForStKeyedStateBackend.java | 10 +-- .../apache/flink/state/forst/ForStListState.java | 2 +- .../apache/flink/state/forst/ForStMapState.java | 4 +- .../flink/state/forst/ForStReducingState.java | 2 +- .../apache/flink/state/forst/ForStValueState.java | 2 +- .../state/forst/ForStDBOperationTestBase.java | 8 +- .../flink/state/forst/ForStListStateTest.java | 2 +- .../ForStIncrementalSnapshotStrategyTest.java | 2 +- .../collect/utils/MockOperatorStateStore.java | 6 +- .../state/JoinRecordAsyncStateViews.java | 4 +- .../state/OuterJoinRecordAsyncStateViews.java | 4 +- 62 files changed, 176 insertions(+), 299 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AggregatingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/AggregatingStateDescriptor.java similarity index 96% copy from flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AggregatingStateDescriptor.java copy to flink-core/src/main/java/org/apache/flink/api/common/state/v2/AggregatingStateDescriptor.java index 1865647c792..26af7de75f2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AggregatingStateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/AggregatingStateDescriptor.java @@ -16,8 +16,9 @@ * limitations under the License. */ -package org.apache.flink.runtime.state.v2; +package org.apache.flink.api.common.state.v2; +import org.apache.flink.annotation.Experimental; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -36,6 +37,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * @param <ACC> The type of the accumulator (intermediate aggregation state). * @param <OUT> The type of the values that are returned from the state. */ +@Experimental public class AggregatingStateDescriptor<IN, ACC, OUT> extends StateDescriptor<ACC> { private final AggregateFunction<IN, ACC, OUT> aggregateFunction; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ListStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/ListStateDescriptor.java similarity index 94% copy from flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ListStateDescriptor.java copy to flink-core/src/main/java/org/apache/flink/api/common/state/v2/ListStateDescriptor.java index 3e41ee8b1f0..1409e6bc52e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ListStateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/ListStateDescriptor.java @@ -16,9 +16,9 @@ * limitations under the License. */ -package org.apache.flink.runtime.state.v2; +package org.apache.flink.api.common.state.v2; -import org.apache.flink.api.common.state.v2.ListState; +import org.apache.flink.annotation.Experimental; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -30,6 +30,7 @@ import javax.annotation.Nonnull; * * @param <T> The type of each value that the list state can hold. */ +@Experimental public class ListStateDescriptor<T> extends StateDescriptor<T> { /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/MapStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/MapStateDescriptor.java similarity index 97% copy from flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/MapStateDescriptor.java copy to flink-core/src/main/java/org/apache/flink/api/common/state/v2/MapStateDescriptor.java index f691094b71b..adfe55a5c83 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/MapStateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/MapStateDescriptor.java @@ -16,12 +16,12 @@ * limitations under the License. */ -package org.apache.flink.runtime.state.v2; +package org.apache.flink.api.common.state.v2; +import org.apache.flink.annotation.Experimental; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.SerializerFactory; -import org.apache.flink.api.common.state.v2.MapState; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -35,6 +35,7 @@ import javax.annotation.Nullable; * @param <UK> The type of the user key for this map state. * @param <UV> The type of the values that the map state can hold. */ +@Experimental public class MapStateDescriptor<UK, UV> extends StateDescriptor<UV> { /** The serializer for the user key. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ReducingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/ReducingStateDescriptor.java similarity index 96% copy from flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ReducingStateDescriptor.java copy to flink-core/src/main/java/org/apache/flink/api/common/state/v2/ReducingStateDescriptor.java index f93a07c3d94..f3bbf18fbe1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ReducingStateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/ReducingStateDescriptor.java @@ -16,8 +16,9 @@ * limitations under the License. */ -package org.apache.flink.runtime.state.v2; +package org.apache.flink.api.common.state.v2; +import org.apache.flink.annotation.Experimental; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -31,6 +32,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * * @param <T> The type of the values that can be added to the state. */ +@Experimental public class ReducingStateDescriptor<T> extends StateDescriptor<T> { private final ReduceFunction<T> reduceFunction; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/StateDescriptor.java similarity index 97% rename from flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptor.java rename to flink-core/src/main/java/org/apache/flink/api/common/state/v2/StateDescriptor.java index d7f19fc4a13..3b16231462c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/StateDescriptor.java @@ -16,8 +16,9 @@ * limitations under the License. */ -package org.apache.flink.runtime.state.v2; +package org.apache.flink.api.common.state.v2; +import org.apache.flink.annotation.Experimental; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.SerializerFactory; @@ -38,12 +39,13 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * * @param <T> The type of the value of the state object described by this state descriptor. */ -@Internal +@Experimental public abstract class StateDescriptor<T> implements Serializable { private static final long serialVersionUID = 1L; /** An enumeration of the types of supported states. */ + @Internal public enum Type { VALUE, LIST, @@ -184,5 +186,6 @@ public abstract class StateDescriptor<T> implements Serializable { } /** Return the specific {@code Type} of described state. */ + @Internal public abstract Type getType(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateSerializerReference.java b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/StateSerializerReference.java similarity index 95% rename from flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateSerializerReference.java rename to flink-core/src/main/java/org/apache/flink/api/common/state/v2/StateSerializerReference.java index 8b35b2a5487..d71c949301b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateSerializerReference.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/StateSerializerReference.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.runtime.state.v2; +package org.apache.flink.api.common.state.v2; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; @@ -36,10 +36,12 @@ import static org.apache.flink.util.Preconditions.checkState; /** * A reference to a serializer. This also provides functions for lazy initialization. + * Package-private for internal use only. * * @param <T> the type for serialization. */ -public class StateSerializerReference<T> extends AtomicReference<TypeSerializer<T>> { +@Internal +class StateSerializerReference<T> extends AtomicReference<TypeSerializer<T>> { private static final Logger LOG = LoggerFactory.getLogger(StateSerializerReference.class); /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ValueStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/ValueStateDescriptor.java similarity index 95% copy from flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ValueStateDescriptor.java copy to flink-core/src/main/java/org/apache/flink/api/common/state/v2/ValueStateDescriptor.java index 20437cd4ec7..dc5b1251cbd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ValueStateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/ValueStateDescriptor.java @@ -16,9 +16,9 @@ * limitations under the License. */ -package org.apache.flink.runtime.state.v2; +package org.apache.flink.api.common.state.v2; -import org.apache.flink.api.common.state.v2.ValueState; +import org.apache.flink.annotation.Experimental; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -30,6 +30,7 @@ import javax.annotation.Nonnull; * * @param <T> The type of the values that the value state can hold. */ +@Experimental public class ValueStateDescriptor<T> extends StateDescriptor<T> { /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AggregatingStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/v2/AggregatingStateDescriptorTest.java similarity index 98% rename from flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AggregatingStateDescriptorTest.java rename to flink-core/src/test/java/org/apache/flink/api/common/state/v2/AggregatingStateDescriptorTest.java index 930a0c69124..0b0685acfff 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AggregatingStateDescriptorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/state/v2/AggregatingStateDescriptorTest.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.runtime.state.v2; +package org.apache.flink.api.common.state.v2; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/ListStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/v2/ListStateDescriptorTest.java similarity index 97% rename from flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/ListStateDescriptorTest.java rename to flink-core/src/test/java/org/apache/flink/api/common/state/v2/ListStateDescriptorTest.java index 9df21b4f1c9..e719ae2b8ed 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/ListStateDescriptorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/state/v2/ListStateDescriptorTest.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.runtime.state.v2; +package org.apache.flink.api.common.state.v2; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.core.testutils.CommonTestUtils; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/MapStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/v2/MapStateDescriptorTest.java similarity index 98% rename from flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/MapStateDescriptorTest.java rename to flink-core/src/test/java/org/apache/flink/api/common/state/v2/MapStateDescriptorTest.java index 7ea88ad79b5..8fcb9da187a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/MapStateDescriptorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/state/v2/MapStateDescriptorTest.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.runtime.state.v2; +package org.apache.flink.api.common.state.v2; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.core.testutils.CommonTestUtils; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/ReducingStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/v2/ReducingStateDescriptorTest.java similarity index 98% rename from flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/ReducingStateDescriptorTest.java rename to flink-core/src/test/java/org/apache/flink/api/common/state/v2/ReducingStateDescriptorTest.java index f1f789bca61..f3879ed7be0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/ReducingStateDescriptorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/state/v2/ReducingStateDescriptorTest.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.runtime.state.v2; +package org.apache.flink.api.common.state.v2; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/StateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/v2/StateDescriptorTest.java similarity index 99% rename from flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/StateDescriptorTest.java rename to flink-core/src/test/java/org/apache/flink/api/common/state/v2/StateDescriptorTest.java index f0139114a49..1c1587c68be 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/StateDescriptorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/state/v2/StateDescriptorTest.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.runtime.state.v2; +package org.apache.flink.api.common.state.v2; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.StateTtlConfig; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/ValueStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/v2/ValueStateDescriptorTest.java similarity index 97% rename from flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/ValueStateDescriptorTest.java rename to flink-core/src/test/java/org/apache/flink/api/common/state/v2/ValueStateDescriptorTest.java index f1bf3820825..edcdf42d7ce 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/ValueStateDescriptorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/state/v2/ValueStateDescriptorTest.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.runtime.state.v2; +package org.apache.flink.api.common.state.v2; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.core.testutils.CommonTestUtils; diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultStateManager.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultStateManager.java index d4074d1cfdc..39018a4b5ff 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultStateManager.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultStateManager.java @@ -27,18 +27,18 @@ import org.apache.flink.api.common.state.ReducingStateDeclaration; import org.apache.flink.api.common.state.StateDeclaration; import org.apache.flink.api.common.state.ValueStateDeclaration; import org.apache.flink.api.common.state.v2.AggregatingState; +import org.apache.flink.api.common.state.v2.AggregatingStateDescriptor; import org.apache.flink.api.common.state.v2.ListState; +import org.apache.flink.api.common.state.v2.ListStateDescriptor; import org.apache.flink.api.common.state.v2.MapState; +import org.apache.flink.api.common.state.v2.MapStateDescriptor; import org.apache.flink.api.common.state.v2.ReducingState; +import org.apache.flink.api.common.state.v2.ReducingStateDescriptor; import org.apache.flink.api.common.state.v2.ValueState; +import org.apache.flink.api.common.state.v2.ValueStateDescriptor; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.datastream.api.context.StateManager; -import org.apache.flink.runtime.state.v2.AggregatingStateDescriptor; -import org.apache.flink.runtime.state.v2.ListStateDescriptor; -import org.apache.flink.runtime.state.v2.MapStateDescriptor; import org.apache.flink.runtime.state.v2.OperatorStateStore; -import org.apache.flink.runtime.state.v2.ReducingStateDescriptor; -import org.apache.flink.runtime.state.v2.ValueStateDescriptor; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.util.Preconditions; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java index 26c948ffc48..ad9ce787cac 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java @@ -23,6 +23,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.api.common.state.v2.State; +import org.apache.flink.api.common.state.v2.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; @@ -32,7 +33,6 @@ import org.apache.flink.runtime.asyncprocessing.declare.DeclarationManager; import org.apache.flink.runtime.event.WatermarkEvent; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.state.AsyncKeyedStateBackend; -import org.apache.flink.runtime.state.v2.StateDescriptor; import org.apache.flink.runtime.state.v2.adaptor.AsyncKeyedStateBackendAdaptor; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.Input; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java index 5a60c455b73..ab3eff901fa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.asyncprocessing.operators; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.state.v2.State; +import org.apache.flink.api.common.state.v2.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; @@ -29,7 +30,6 @@ import org.apache.flink.runtime.asyncprocessing.RecordContext; import org.apache.flink.runtime.asyncprocessing.declare.DeclarationManager; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.state.AsyncKeyedStateBackend; -import org.apache.flink.runtime.state.v2.StateDescriptor; import org.apache.flink.runtime.state.v2.adaptor.AsyncKeyedStateBackendAdaptor; import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2; import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java index bdd1571fbce..21fdf4b6e2f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java @@ -21,13 +21,13 @@ package org.apache.flink.runtime.state; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.state.InternalCheckpointListener; import org.apache.flink.api.common.state.v2.State; +import org.apache.flink.api.common.state.v2.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; import org.apache.flink.runtime.asyncprocessing.RecordContext; import org.apache.flink.runtime.asyncprocessing.StateExecutor; import org.apache.flink.runtime.asyncprocessing.StateRequestHandler; import org.apache.flink.runtime.checkpoint.SnapshotType; -import org.apache.flink.runtime.state.v2.StateDescriptor; import org.apache.flink.runtime.state.v2.internal.InternalKeyedState; import org.apache.flink.util.Disposable; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java index 921409ad31a..f2f4e1cf97a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java @@ -224,14 +224,14 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { @Override public <K, V> BroadcastState<K, V> getBroadcastState( - org.apache.flink.runtime.state.v2.MapStateDescriptor<K, V> stateDescriptor) + org.apache.flink.api.common.state.v2.MapStateDescriptor<K, V> stateDescriptor) throws Exception { return getBroadcastState(StateDescriptorUtils.transformFromV2ToV1(stateDescriptor)); } @Override public <S> org.apache.flink.api.common.state.v2.ListState<S> getListState( - org.apache.flink.runtime.state.v2.ListStateDescriptor<S> stateDescriptor) + org.apache.flink.api.common.state.v2.ListStateDescriptor<S> stateDescriptor) throws Exception { return new OperatorListStateAdaptor<>( getListState(StateDescriptorUtils.transformFromV2ToV1(stateDescriptor))); @@ -239,7 +239,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { @Override public <S> org.apache.flink.api.common.state.v2.ListState<S> getUnionListState( - org.apache.flink.runtime.state.v2.ListStateDescriptor<S> stateDescriptor) + org.apache.flink.api.common.state.v2.ListStateDescriptor<S> stateDescriptor) throws Exception { return new OperatorListStateAdaptor<>( getListState(StateDescriptorUtils.transformFromV2ToV1(stateDescriptor))); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractAggregatingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractAggregatingState.java index 95850d928ce..222c3688913 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractAggregatingState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractAggregatingState.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.state.v2; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.state.v2.AggregatingState; +import org.apache.flink.api.common.state.v2.AggregatingStateDescriptor; import org.apache.flink.api.common.state.v2.StateFuture; import org.apache.flink.core.state.StateFutureUtils; import org.apache.flink.runtime.asyncprocessing.StateRequestHandler; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractKeyedState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractKeyedState.java index d1492410992..2056d710ce6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractKeyedState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractKeyedState.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.state.v2; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.state.v2.State; +import org.apache.flink.api.common.state.v2.StateDescriptor; import org.apache.flink.api.common.state.v2.StateFuture; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractListState.java index 5c7a6ba6bf3..263de23f31a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractListState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractListState.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.state.v2; import org.apache.flink.api.common.state.v2.ListState; +import org.apache.flink.api.common.state.v2.ListStateDescriptor; import org.apache.flink.api.common.state.v2.StateFuture; import org.apache.flink.api.common.state.v2.StateIterator; import org.apache.flink.runtime.asyncprocessing.StateRequestHandler; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractMapState.java index 108b3128344..fb3ad546aed 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractMapState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractMapState.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.state.v2; import org.apache.flink.api.common.state.v2.MapState; +import org.apache.flink.api.common.state.v2.MapStateDescriptor; import org.apache.flink.api.common.state.v2.StateFuture; import org.apache.flink.api.common.state.v2.StateIterator; import org.apache.flink.api.java.tuple.Tuple2; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractReducingState.java index 4fb7967bd6c..daf22c4398a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractReducingState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractReducingState.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.state.v2; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.state.v2.ReducingState; +import org.apache.flink.api.common.state.v2.ReducingStateDescriptor; import org.apache.flink.api.common.state.v2.StateFuture; import org.apache.flink.core.state.StateFutureUtils; import org.apache.flink.runtime.asyncprocessing.StateRequestHandler; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractValueState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractValueState.java index 0affbf415f3..eda66c9cea2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractValueState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractValueState.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.state.v2; import org.apache.flink.api.common.state.v2.StateFuture; import org.apache.flink.api.common.state.v2.ValueState; +import org.apache.flink.api.common.state.v2.ValueStateDescriptor; import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; import org.apache.flink.runtime.asyncprocessing.StateRequestHandler; import org.apache.flink.runtime.asyncprocessing.StateRequestType; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AggregatingStateDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AggregatingStateDescriptor.java index 1865647c792..078b189fbe1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AggregatingStateDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AggregatingStateDescriptor.java @@ -24,59 +24,25 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import javax.annotation.Nonnull; -import static org.apache.flink.util.Preconditions.checkNotNull; - /** - * A {@link StateDescriptor} for {@link org.apache.flink.api.common.state.v2.AggregatingState}. - * - * <p>The type internally stored in the state is the type of the {@code Accumulator} of the {@code - * AggregateFunction}. - * - * @param <IN> The type of the values that are added to the state. - * @param <ACC> The type of the accumulator (intermediate aggregation state). - * @param <OUT> The type of the values that are returned from the state. + * This class is deprecated and only a placeholder for compatibility for EXISTING PRs. This is never + * released before, and will be safely removed before 2.0 release. */ -public class AggregatingStateDescriptor<IN, ACC, OUT> extends StateDescriptor<ACC> { - - private final AggregateFunction<IN, ACC, OUT> aggregateFunction; +@Deprecated +public class AggregatingStateDescriptor<IN, ACC, OUT> + extends org.apache.flink.api.common.state.v2.AggregatingStateDescriptor<IN, ACC, OUT> { - /** - * Create a new state descriptor with the given name, function, and type. - * - * @param stateId The (unique) name for the state. - * @param aggregateFunction The {@code AggregateFunction} used to aggregate the state. - * @param typeInfo The type of the accumulator. The accumulator is stored in the state. - */ public AggregatingStateDescriptor( @Nonnull String stateId, @Nonnull AggregateFunction<IN, ACC, OUT> aggregateFunction, @Nonnull TypeInformation<ACC> typeInfo) { - super(stateId, typeInfo); - this.aggregateFunction = checkNotNull(aggregateFunction); + super(stateId, aggregateFunction, typeInfo); } - /** - * Create a new {@code ReducingStateDescriptor} with the given stateId and the given type - * serializer. - * - * @param stateId The (unique) stateId for the state. - * @param serializer The type serializer for accumulator. - */ public AggregatingStateDescriptor( @Nonnull String stateId, @Nonnull AggregateFunction<IN, ACC, OUT> aggregateFunction, @Nonnull TypeSerializer<ACC> serializer) { - super(stateId, serializer); - this.aggregateFunction = checkNotNull(aggregateFunction); - } - - /** Returns the Aggregate function for this state. */ - public AggregateFunction<IN, ACC, OUT> getAggregateFunction() { - return aggregateFunction; - } - - @Override - public Type getType() { - return Type.AGGREGATING; + super(stateId, aggregateFunction, serializer); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/DefaultKeyedStateStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/DefaultKeyedStateStore.java index 92eed6decd9..52565b7fad6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/DefaultKeyedStateStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/DefaultKeyedStateStore.java @@ -20,10 +20,15 @@ package org.apache.flink.runtime.state.v2; import org.apache.flink.api.common.functions.SerializerFactory; import org.apache.flink.api.common.state.v2.AggregatingState; +import org.apache.flink.api.common.state.v2.AggregatingStateDescriptor; import org.apache.flink.api.common.state.v2.ListState; +import org.apache.flink.api.common.state.v2.ListStateDescriptor; import org.apache.flink.api.common.state.v2.MapState; +import org.apache.flink.api.common.state.v2.MapStateDescriptor; import org.apache.flink.api.common.state.v2.ReducingState; +import org.apache.flink.api.common.state.v2.ReducingStateDescriptor; import org.apache.flink.api.common.state.v2.ValueState; +import org.apache.flink.api.common.state.v2.ValueStateDescriptor; import org.apache.flink.runtime.state.AsyncKeyedStateBackend; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/KeyedStateStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/KeyedStateStore.java index ccb58e5c763..b47b3b51623 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/KeyedStateStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/KeyedStateStore.java @@ -20,11 +20,16 @@ package org.apache.flink.runtime.state.v2; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.state.v2.AggregatingState; +import org.apache.flink.api.common.state.v2.AggregatingStateDescriptor; import org.apache.flink.api.common.state.v2.ListState; +import org.apache.flink.api.common.state.v2.ListStateDescriptor; import org.apache.flink.api.common.state.v2.MapState; +import org.apache.flink.api.common.state.v2.MapStateDescriptor; import org.apache.flink.api.common.state.v2.ReducingState; +import org.apache.flink.api.common.state.v2.ReducingStateDescriptor; import org.apache.flink.api.common.state.v2.State; import org.apache.flink.api.common.state.v2.ValueState; +import org.apache.flink.api.common.state.v2.ValueStateDescriptor; import javax.annotation.Nonnull; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ListStateDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ListStateDescriptor.java index 3e41ee8b1f0..6e22610602c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ListStateDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ListStateDescriptor.java @@ -18,43 +18,24 @@ package org.apache.flink.runtime.state.v2; -import org.apache.flink.api.common.state.v2.ListState; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import javax.annotation.Nonnull; /** - * {@link StateDescriptor} for {@link ListState}. This can be used to create partitioned list state - * internally. - * - * @param <T> The type of each value that the list state can hold. + * This class is deprecated and only a placeholder for compatibility for EXISTING PRs. This is never + * released before, and will be safely removed before 2.0 release. */ -public class ListStateDescriptor<T> extends StateDescriptor<T> { +@Deprecated +public class ListStateDescriptor<T> + extends org.apache.flink.api.common.state.v2.ListStateDescriptor<T> { - /** - * Creates a new {@code ListStateDescriptor} with the given stateId and type. - * - * @param stateId The (unique) stateId for the state. - * @param typeInfo The type of the values in the state. - */ public ListStateDescriptor(@Nonnull String stateId, @Nonnull TypeInformation<T> typeInfo) { super(stateId, typeInfo); } - /** - * Create a new {@code ListStateDescriptor} with the given stateId and the given type - * serializer. - * - * @param stateId The (unique) stateId for the state. - * @param serializer The type serializer for the values in the state. - */ public ListStateDescriptor(@Nonnull String stateId, @Nonnull TypeSerializer<T> serializer) { super(stateId, serializer); } - - @Override - public Type getType() { - return Type.LIST; - } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/MapStateDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/MapStateDescriptor.java index f691094b71b..37573e25e64 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/MapStateDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/MapStateDescriptor.java @@ -18,105 +18,30 @@ package org.apache.flink.runtime.state.v2; -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.functions.SerializerFactory; -import org.apache.flink.api.common.state.v2.MapState; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import javax.annotation.Nonnull; -import javax.annotation.Nullable; /** - * {@link StateDescriptor} for {@link MapState}. This can be used to create partitioned map state - * internally. - * - * @param <UK> The type of the user key for this map state. - * @param <UV> The type of the values that the map state can hold. + * This class is deprecated and only a placeholder for compatibility for EXISTING PRs. This is never + * released before, and will be safely removed before 2.0 release. */ -public class MapStateDescriptor<UK, UV> extends StateDescriptor<UV> { - - /** The serializer for the user key. */ - @Nonnull private final StateSerializerReference<UK> userKeySerializer; +@Deprecated +public class MapStateDescriptor<UK, UV> + extends org.apache.flink.api.common.state.v2.MapStateDescriptor<UK, UV> { - /** - * Creates a new {@code MapStateDescriptor} with the given stateId and type. - * - * @param stateId The (unique) stateId for the state. - * @param userKeyTypeInfo The type of the user keys in the state. - * @param userValueTypeInfo The type of the values in the state. - */ public MapStateDescriptor( @Nonnull String stateId, @Nonnull TypeInformation<UK> userKeyTypeInfo, @Nonnull TypeInformation<UV> userValueTypeInfo) { - super(stateId, userValueTypeInfo); - this.userKeySerializer = new StateSerializerReference<>(userKeyTypeInfo); + super(stateId, userKeyTypeInfo, userValueTypeInfo); } - /** - * Create a new {@code MapStateDescriptor} with the given stateId and the given type serializer. - * - * @param stateId The (unique) stateId for the state. - * @param userKeySerializer The serializer for the user keys in the state. - * @param userValueSerializer The serializer for the user values in the state. - */ public MapStateDescriptor( @Nonnull String stateId, @Nonnull TypeSerializer<UK> userKeySerializer, @Nonnull TypeSerializer<UV> userValueSerializer) { - super(stateId, userValueSerializer); - this.userKeySerializer = new StateSerializerReference<>(userKeySerializer); - } - - @Nonnull - public TypeSerializer<UK> getUserKeySerializer() { - TypeSerializer<UK> serializer = userKeySerializer.get(); - if (serializer != null) { - return serializer.duplicate(); - } else { - throw new IllegalStateException("Serializer not yet initialized."); - } - } - - @Internal - @Nullable - public TypeInformation<UK> getUserKeyTypeInformation() { - return userKeySerializer.getTypeInformation(); - } - - /** - * Checks whether the serializer has been initialized. Serializer initialization is lazy, to - * allow parametrization of serializers with an {@link ExecutionConfig} via {@link - * #initializeSerializerUnlessSet(ExecutionConfig)}. - * - * @return True if the serializers have been initialized, false otherwise. - */ - @Override - public boolean isSerializerInitialized() { - return super.isSerializerInitialized() && userKeySerializer.isInitialized(); - } - - /** - * Initializes the serializer, unless it has been initialized before. - * - * @param executionConfig The execution config to use when creating the serializer. - */ - @Override - public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) { - super.initializeSerializerUnlessSet(executionConfig); - userKeySerializer.initializeUnlessSet(executionConfig); - } - - @Override - public void initializeSerializerUnlessSet(SerializerFactory serializerFactory) { - super.initializeSerializerUnlessSet(serializerFactory); - userKeySerializer.initializeUnlessSet(serializerFactory); - } - - @Override - public Type getType() { - return Type.MAP; + super(stateId, userKeySerializer, userValueSerializer); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/OperatorStateStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/OperatorStateStore.java index b9a28858b4f..50bd0c0a8d0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/OperatorStateStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/OperatorStateStore.java @@ -21,6 +21,8 @@ package org.apache.flink.runtime.state.v2; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.v2.ListState; +import org.apache.flink.api.common.state.v2.ListStateDescriptor; +import org.apache.flink.api.common.state.v2.MapStateDescriptor; import java.util.Set; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ReducingStateDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ReducingStateDescriptor.java index f93a07c3d94..053b9704eaa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ReducingStateDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ReducingStateDescriptor.java @@ -24,54 +24,25 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import javax.annotation.Nonnull; -import static org.apache.flink.util.Preconditions.checkNotNull; - /** - * {@link StateDescriptor} for {@link org.apache.flink.api.common.state.v2.ReducingState}. - * - * @param <T> The type of the values that can be added to the state. + * This class is deprecated and only a placeholder for compatibility for EXISTING PRs. This is never + * released before, and will be safely removed before 2.0 release. */ -public class ReducingStateDescriptor<T> extends StateDescriptor<T> { +@Deprecated +public class ReducingStateDescriptor<T> + extends org.apache.flink.api.common.state.v2.ReducingStateDescriptor<T> { - private final ReduceFunction<T> reduceFunction; - - /** - * Creates a new {@code ReducingStateDescriptor} with the given name and default value. - * - * @param name The (unique) name for the state. - * @param reduceFunction The {@code ReduceFunction} used to aggregate the state. - * @param typeInfo The type of the values in the state. - */ public ReducingStateDescriptor( @Nonnull String name, @Nonnull ReduceFunction<T> reduceFunction, @Nonnull TypeInformation<T> typeInfo) { - super(name, typeInfo); - this.reduceFunction = checkNotNull(reduceFunction); + super(name, reduceFunction, typeInfo); } - /** - * Create a new {@code ReducingStateDescriptor} with the given stateId and the given type - * serializer. - * - * @param stateId The (unique) stateId for the state. - * @param serializer The type serializer for the values in the state. - */ public ReducingStateDescriptor( @Nonnull String stateId, @Nonnull ReduceFunction<T> reduceFunction, @Nonnull TypeSerializer<T> serializer) { - super(stateId, serializer); - this.reduceFunction = checkNotNull(reduceFunction); - } - - /** Returns the reduce function to be used for the reducing state. */ - public ReduceFunction<T> getReduceFunction() { - return reduceFunction; - } - - @Override - public Type getType() { - return Type.REDUCING; + super(stateId, reduceFunction, serializer); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/RegisteredKeyValueStateBackendMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/RegisteredKeyValueStateBackendMetaInfo.java index 7f4667e9408..45107561f9a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/RegisteredKeyValueStateBackendMetaInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/RegisteredKeyValueStateBackendMetaInfo.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.state.v2; +import org.apache.flink.api.common.state.v2.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptorUtils.java index ca57a657852..8b62628488f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptorUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptorUtils.java @@ -18,6 +18,13 @@ package org.apache.flink.runtime.state.v2; +import org.apache.flink.api.common.state.v2.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.v2.ListStateDescriptor; +import org.apache.flink.api.common.state.v2.MapStateDescriptor; +import org.apache.flink.api.common.state.v2.ReducingStateDescriptor; +import org.apache.flink.api.common.state.v2.StateDescriptor; +import org.apache.flink.api.common.state.v2.ValueStateDescriptor; + /** * Utilities for transforming {@link StateDescriptor} to {@link * org.apache.flink.api.common.state.StateDescriptor}. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ValueStateDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ValueStateDescriptor.java index 20437cd4ec7..70e45ee9c3a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ValueStateDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ValueStateDescriptor.java @@ -18,43 +18,24 @@ package org.apache.flink.runtime.state.v2; -import org.apache.flink.api.common.state.v2.ValueState; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import javax.annotation.Nonnull; /** - * {@link StateDescriptor} for {@link ValueState}. This can be used to create partitioned value - * state internally. - * - * @param <T> The type of the values that the value state can hold. + * This class is deprecated and only a placeholder for compatibility for EXISTING PRs. This is never + * released before, and will be safely removed before 2.0 release. */ -public class ValueStateDescriptor<T> extends StateDescriptor<T> { +@Deprecated +public class ValueStateDescriptor<T> + extends org.apache.flink.api.common.state.v2.ValueStateDescriptor<T> { - /** - * Creates a new {@code ValueStateDescriptor} with the given stateId and type. - * - * @param stateId The (unique) stateId for the state. - * @param typeInfo The type of the values in the state. - */ public ValueStateDescriptor(@Nonnull String stateId, @Nonnull TypeInformation<T> typeInfo) { super(stateId, typeInfo); } - /** - * Create a new {@code ValueStateDescriptor} with the given stateId and the given type - * serializer. - * - * @param stateId The (unique) stateId for the state. - * @param serializer The type serializer for the values in the state. - */ public ValueStateDescriptor(@Nonnull String stateId, @Nonnull TypeSerializer<T> serializer) { super(stateId, serializer); } - - @Override - public Type getType() { - return Type.VALUE; - } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java index d0ec7188fd4..dae4a959d8d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.state.v2.adaptor; import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.api.common.state.InternalCheckpointListener; import org.apache.flink.api.common.state.v2.State; +import org.apache.flink.api.common.state.v2.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.asyncprocessing.RecordContext; import org.apache.flink.runtime.asyncprocessing.StateExecutor; @@ -44,7 +45,6 @@ import org.apache.flink.runtime.state.internal.InternalListState; import org.apache.flink.runtime.state.internal.InternalMapState; import org.apache.flink.runtime.state.internal.InternalReducingState; import org.apache.flink.runtime.state.internal.InternalValueState; -import org.apache.flink.runtime.state.v2.StateDescriptor; import org.apache.flink.runtime.state.v2.StateDescriptorUtils; import org.apache.flink.runtime.state.v2.internal.InternalKeyedState; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlStateFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlStateFactory.java index e4b946dbeed..8a820490228 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlStateFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlStateFactory.java @@ -19,7 +19,13 @@ package org.apache.flink.runtime.state.v2.ttl; import org.apache.flink.api.common.state.StateTtlConfig; +import org.apache.flink.api.common.state.v2.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.v2.ListStateDescriptor; +import org.apache.flink.api.common.state.v2.MapStateDescriptor; +import org.apache.flink.api.common.state.v2.ReducingStateDescriptor; import org.apache.flink.api.common.state.v2.State; +import org.apache.flink.api.common.state.v2.StateDescriptor; +import org.apache.flink.api.common.state.v2.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.CompositeSerializer; import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -31,12 +37,6 @@ import org.apache.flink.runtime.state.ttl.TtlReduceFunction; import org.apache.flink.runtime.state.ttl.TtlStateContext; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.runtime.state.ttl.TtlValue; -import org.apache.flink.runtime.state.v2.AggregatingStateDescriptor; -import org.apache.flink.runtime.state.v2.ListStateDescriptor; -import org.apache.flink.runtime.state.v2.MapStateDescriptor; -import org.apache.flink.runtime.state.v2.ReducingStateDescriptor; -import org.apache.flink.runtime.state.v2.StateDescriptor; -import org.apache.flink.runtime.state.v2.ValueStateDescriptor; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.SupplierWithException; diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceAsyncStateOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceAsyncStateOperator.java index 189769d11cb..9794e25ff72 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceAsyncStateOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceAsyncStateOperator.java @@ -20,9 +20,9 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.state.v2.ValueState; +import org.apache.flink.api.common.state.v2.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator; -import org.apache.flink.runtime.state.v2.ValueStateDescriptor; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; /** diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java index 56d32bf1f3f..bf01d4df0c7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java @@ -421,7 +421,7 @@ public class StreamOperatorStateHandler { public <N, S extends org.apache.flink.api.common.state.v2.State, T> S getOrCreateKeyedState( N defaultNamespace, TypeSerializer<N> namespaceSerializer, - org.apache.flink.runtime.state.v2.StateDescriptor<T> stateDescriptor) + org.apache.flink.api.common.state.v2.StateDescriptor<T> stateDescriptor) throws Exception { if (asyncKeyedStateBackend != null) { diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java index c97dbec6c86..1859ffc2f8a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java @@ -269,7 +269,7 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext { // TODO: Reconstruct this after StateManager is ready in FLIP-410. public <T> org.apache.flink.api.common.state.v2.ValueState<T> getValueState( - org.apache.flink.runtime.state.v2.ValueStateDescriptor<T> stateProperties) { + org.apache.flink.api.common.state.v2.ValueStateDescriptor<T> stateProperties) { org.apache.flink.runtime.state.v2.KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStoreV2(stateProperties); stateProperties.initializeSerializerUnlessSet(this::createSerializer); @@ -277,7 +277,7 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext { } public <T> org.apache.flink.api.common.state.v2.ListState<T> getListState( - org.apache.flink.runtime.state.v2.ListStateDescriptor<T> stateProperties) { + org.apache.flink.api.common.state.v2.ListStateDescriptor<T> stateProperties) { org.apache.flink.runtime.state.v2.KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStoreV2(stateProperties); stateProperties.initializeSerializerUnlessSet(this::createSerializer); @@ -285,7 +285,7 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext { } public <UK, UV> org.apache.flink.api.common.state.v2.MapState<UK, UV> getMapState( - org.apache.flink.runtime.state.v2.MapStateDescriptor<UK, UV> stateProperties) { + org.apache.flink.api.common.state.v2.MapStateDescriptor<UK, UV> stateProperties) { org.apache.flink.runtime.state.v2.KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStoreV2(stateProperties); stateProperties.initializeSerializerUnlessSet(this::createSerializer); @@ -293,7 +293,7 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext { } public <T> org.apache.flink.api.common.state.v2.ReducingState<T> getReducingState( - org.apache.flink.runtime.state.v2.ReducingStateDescriptor<T> stateProperties) { + org.apache.flink.api.common.state.v2.ReducingStateDescriptor<T> stateProperties) { org.apache.flink.runtime.state.v2.KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStoreV2(stateProperties); stateProperties.initializeSerializerUnlessSet(this::createSerializer); @@ -302,7 +302,7 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext { public <IN, ACC, OUT> org.apache.flink.api.common.state.v2.AggregatingState<IN, OUT> getAggregatingState( - org.apache.flink.runtime.state.v2.AggregatingStateDescriptor<IN, ACC, OUT> + org.apache.flink.api.common.state.v2.AggregatingStateDescriptor<IN, ACC, OUT> stateProperties) { org.apache.flink.runtime.state.v2.KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStoreV2(stateProperties); @@ -312,7 +312,7 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext { private org.apache.flink.runtime.state.v2.KeyedStateStore checkPreconditionsAndGetKeyedStateStoreV2( - org.apache.flink.runtime.state.v2.StateDescriptor<?> stateDescriptor) { + org.apache.flink.api.common.state.v2.StateDescriptor<?> stateDescriptor) { checkNotNull(stateDescriptor, "The state properties must not be null"); checkState( supportKeyedStateApiSet == SupportKeyedStateApiSet.STATE_V2, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java index 2311b361fea..0afe85ec5c1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.asyncprocessing; import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.api.common.state.v2.State; +import org.apache.flink.api.common.state.v2.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.java.tuple.Tuple2; @@ -36,7 +37,6 @@ import org.apache.flink.runtime.state.StateBackendTestUtils; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.v2.AbstractValueState; -import org.apache.flink.runtime.state.v2.ValueStateDescriptor; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.ThrowingRunnable; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestUtils.java index 10fe3f4d200..ace17d9671d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestUtils.java @@ -137,7 +137,7 @@ public class StateBackendTestUtils { S getOrCreateKeyedState( N defaultNamespace, TypeSerializer<N> namespaceSerializer, - org.apache.flink.runtime.state.v2.StateDescriptor<SV> stateDesc) + org.apache.flink.api.common.state.v2.StateDescriptor<SV> stateDesc) throws Exception { stateDesc.initializeSerializerUnlessSet(new ExecutionConfig()); return (S) innerStateSupplier.get(); @@ -148,7 +148,7 @@ public class StateBackendTestUtils { public <N, S extends InternalKeyedState, SV> S createStateInternal( @Nonnull N defaultNamespace, @Nonnull TypeSerializer<N> namespaceSerializer, - @Nonnull org.apache.flink.runtime.state.v2.StateDescriptor<SV> stateDesc) + @Nonnull org.apache.flink.api.common.state.v2.StateDescriptor<SV> stateDesc) throws Exception { stateDesc.initializeSerializerUnlessSet(new ExecutionConfig()); return (S) innerStateSupplier.get(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractAggregatingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractAggregatingStateTest.java index e9e890da864..1e8dc088ede 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractAggregatingStateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractAggregatingStateTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.state.v2; import org.apache.flink.api.common.functions.AggregateFunction; +import org.apache.flink.api.common.state.v2.AggregatingStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase.java index cae8c80da9a..19c408025b5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.state.v2; import org.apache.flink.api.common.state.v2.State; +import org.apache.flink.api.common.state.v2.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; import org.apache.flink.runtime.asyncprocessing.StateExecutor; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractListStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractListStateTest.java index 8f87102d50a..456b23dfd3b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractListStateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractListStateTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.state.v2; +import org.apache.flink.api.common.state.v2.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.runtime.asyncprocessing.StateRequestType; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractMapStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractMapStateTest.java index 57eb4029664..5d08c561b0c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractMapStateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractMapStateTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.state.v2; +import org.apache.flink.api.common.state.v2.MapStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.asyncprocessing.StateRequestType; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractReducingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractReducingStateTest.java index 62066da5744..9c4de234c41 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractReducingStateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractReducingStateTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.state.v2; import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.state.v2.ReducingStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractValueStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractValueStateTest.java index 228ec3073ea..307e7063e32 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractValueStateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractValueStateTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.state.v2; +import org.apache.flink.api.common.state.v2.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.runtime.asyncprocessing.StateRequestType; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AsyncKeyedStateBackendAdaptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AsyncKeyedStateBackendAdaptorTest.java index 371e4ef3ab3..9b0dd333644 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AsyncKeyedStateBackendAdaptorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AsyncKeyedStateBackendAdaptorTest.java @@ -21,6 +21,12 @@ package org.apache.flink.runtime.state.v2; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.functions.AggregateFunction; +import org.apache.flink.api.common.state.v2.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.v2.ListStateDescriptor; +import org.apache.flink.api.common.state.v2.MapStateDescriptor; +import org.apache.flink.api.common.state.v2.ReducingStateDescriptor; +import org.apache.flink.api.common.state.v2.StateDescriptor; +import org.apache.flink.api.common.state.v2.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/StateBackendTestV2Base.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/StateBackendTestV2Base.java index b3fd578474c..a1a3165ba82 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/StateBackendTestV2Base.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/StateBackendTestV2Base.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.state.v2; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.state.v2.ValueState; +import org.apache.flink.api.common.state.v2.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java index 53fc130d9f7..f61696c7485 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java @@ -257,13 +257,14 @@ class StreamingRuntimeContextTest { final AtomicReference<Object> descriptorCapture = new AtomicReference<>(); StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config, true); - org.apache.flink.runtime.state.v2.ValueStateDescriptor<TaskInfo> descr = - new org.apache.flink.runtime.state.v2.ValueStateDescriptor<>( + org.apache.flink.api.common.state.v2.ValueStateDescriptor<TaskInfo> descr = + new org.apache.flink.api.common.state.v2.ValueStateDescriptor<>( "name", TypeInformation.of(TaskInfo.class)); context.getValueState(descr); - org.apache.flink.runtime.state.v2.ValueStateDescriptor<?> descrIntercepted = - (org.apache.flink.runtime.state.v2.ValueStateDescriptor<?>) descriptorCapture.get(); + org.apache.flink.api.common.state.v2.ValueStateDescriptor<?> descrIntercepted = + (org.apache.flink.api.common.state.v2.ValueStateDescriptor<?>) + descriptorCapture.get(); TypeSerializer<?> serializer = descrIntercepted.getSerializer(); // check that the Path class is really registered, i.e., the execution config was applied @@ -281,13 +282,14 @@ class StreamingRuntimeContextTest { final AtomicReference<Object> descriptorCapture = new AtomicReference<>(); StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config, true); - org.apache.flink.runtime.state.v2.ListStateDescriptor<TaskInfo> descr = - new org.apache.flink.runtime.state.v2.ListStateDescriptor<>( + org.apache.flink.api.common.state.v2.ListStateDescriptor<TaskInfo> descr = + new org.apache.flink.api.common.state.v2.ListStateDescriptor<>( "name", TypeInformation.of(TaskInfo.class)); context.getListState(descr); - org.apache.flink.runtime.state.v2.ListStateDescriptor<?> descrIntercepted = - (org.apache.flink.runtime.state.v2.ListStateDescriptor<?>) descriptorCapture.get(); + org.apache.flink.api.common.state.v2.ListStateDescriptor<?> descrIntercepted = + (org.apache.flink.api.common.state.v2.ListStateDescriptor<?>) + descriptorCapture.get(); TypeSerializer<?> serializer = descrIntercepted.getSerializer(); // check that the Path class is really registered, i.e., the execution config was applied @@ -305,15 +307,15 @@ class StreamingRuntimeContextTest { final AtomicReference<Object> descriptorCapture = new AtomicReference<>(); StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config, true); - org.apache.flink.runtime.state.v2.MapStateDescriptor<String, TaskInfo> descr = - new org.apache.flink.runtime.state.v2.MapStateDescriptor<>( + org.apache.flink.api.common.state.v2.MapStateDescriptor<String, TaskInfo> descr = + new org.apache.flink.api.common.state.v2.MapStateDescriptor<>( "name", TypeInformation.of(String.class), TypeInformation.of(TaskInfo.class)); context.getMapState(descr); - org.apache.flink.runtime.state.v2.MapStateDescriptor<?, ?> descrIntercepted = - (org.apache.flink.runtime.state.v2.MapStateDescriptor<?, ?>) + org.apache.flink.api.common.state.v2.MapStateDescriptor<?, ?> descrIntercepted = + (org.apache.flink.api.common.state.v2.MapStateDescriptor<?, ?>) descriptorCapture.get(); TypeSerializer<?> serializer = descrIntercepted.getSerializer(); @@ -336,14 +338,14 @@ class StreamingRuntimeContextTest { @SuppressWarnings("unchecked") ReduceFunction<TaskInfo> reducer = (ReduceFunction<TaskInfo>) mock(ReduceFunction.class); - org.apache.flink.runtime.state.v2.ReducingStateDescriptor<TaskInfo> descr = - new org.apache.flink.runtime.state.v2.ReducingStateDescriptor<>( + org.apache.flink.api.common.state.v2.ReducingStateDescriptor<TaskInfo> descr = + new org.apache.flink.api.common.state.v2.ReducingStateDescriptor<>( "name", reducer, TypeInformation.of(TaskInfo.class)); context.getReducingState(descr); - org.apache.flink.runtime.state.v2.ReducingStateDescriptor<?> descrIntercepted = - (org.apache.flink.runtime.state.v2.ReducingStateDescriptor<?>) + org.apache.flink.api.common.state.v2.ReducingStateDescriptor<?> descrIntercepted = + (org.apache.flink.api.common.state.v2.ReducingStateDescriptor<?>) descriptorCapture.get(); TypeSerializer<?> serializer = descrIntercepted.getSerializer(); @@ -367,15 +369,15 @@ class StreamingRuntimeContextTest { AggregateFunction<String, TaskInfo, String> aggregate = (AggregateFunction<String, TaskInfo, String>) mock(AggregateFunction.class); - org.apache.flink.runtime.state.v2.AggregatingStateDescriptor<String, TaskInfo, String> + org.apache.flink.api.common.state.v2.AggregatingStateDescriptor<String, TaskInfo, String> descr = - new org.apache.flink.runtime.state.v2.AggregatingStateDescriptor<>( + new org.apache.flink.api.common.state.v2.AggregatingStateDescriptor<>( "name", aggregate, TypeInformation.of(TaskInfo.class)); context.getAggregatingState(descr); - org.apache.flink.runtime.state.v2.AggregatingStateDescriptor<?, ?, ?> descrIntercepted = - (org.apache.flink.runtime.state.v2.AggregatingStateDescriptor<?, ?, ?>) + org.apache.flink.api.common.state.v2.AggregatingStateDescriptor<?, ?, ?> descrIntercepted = + (org.apache.flink.api.common.state.v2.AggregatingStateDescriptor<?, ?, ?>) descriptorCapture.get(); TypeSerializer<?> serializer = descrIntercepted.getSerializer(); @@ -492,7 +494,7 @@ class StreamingRuntimeContextTest { .getOrCreateKeyedState( any(), any(TypeSerializer.class), - any(org.apache.flink.runtime.state.v2.StateDescriptor.class)); + any(org.apache.flink.api.common.state.v2.StateDescriptor.class)); operator.initializeState(streamTaskStateManager); if (!stateV2) { diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStAggregatingState.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStAggregatingState.java index abec5d6c83c..ba05170bf20 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStAggregatingState.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStAggregatingState.java @@ -19,6 +19,7 @@ package org.apache.flink.state.forst; import org.apache.flink.api.common.state.v2.AggregatingState; +import org.apache.flink.api.common.state.v2.AggregatingStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataOutputSerializer; @@ -31,7 +32,6 @@ import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.v2.AbstractAggregatingState; -import org.apache.flink.runtime.state.v2.AggregatingStateDescriptor; import org.apache.flink.util.Preconditions; import org.forstdb.ColumnFamilyHandle; diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java index fce143303c1..5cc7e2f3d2b 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java @@ -19,7 +19,12 @@ package org.apache.flink.state.forst; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.v2.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.v2.ListStateDescriptor; +import org.apache.flink.api.common.state.v2.ReducingStateDescriptor; import org.apache.flink.api.common.state.v2.State; +import org.apache.flink.api.common.state.v2.StateDescriptor; +import org.apache.flink.api.common.state.v2.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.api.java.tuple.Tuple2; @@ -48,12 +53,7 @@ import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement; import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory; import org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; -import org.apache.flink.runtime.state.v2.AggregatingStateDescriptor; -import org.apache.flink.runtime.state.v2.ListStateDescriptor; -import org.apache.flink.runtime.state.v2.ReducingStateDescriptor; import org.apache.flink.runtime.state.v2.RegisteredKeyValueStateBackendMetaInfo; -import org.apache.flink.runtime.state.v2.StateDescriptor; -import org.apache.flink.runtime.state.v2.ValueStateDescriptor; import org.apache.flink.runtime.state.v2.internal.InternalKeyedState; import org.apache.flink.runtime.state.v2.ttl.TtlStateFactory; import org.apache.flink.state.forst.snapshot.ForStSnapshotStrategyBase; diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListState.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListState.java index f3bfc29cb17..e906ad0ffb4 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListState.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListState.java @@ -19,6 +19,7 @@ package org.apache.flink.state.forst; import org.apache.flink.api.common.state.v2.ListState; +import org.apache.flink.api.common.state.v2.ListStateDescriptor; import org.apache.flink.api.common.state.v2.StateFuture; import org.apache.flink.api.common.state.v2.StateIterator; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -35,7 +36,6 @@ import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.v2.AbstractListState; -import org.apache.flink.runtime.state.v2.ListStateDescriptor; import org.forstdb.ColumnFamilyHandle; diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMapState.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMapState.java index 1c7721ddb66..af96d8e9d5c 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMapState.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMapState.java @@ -18,7 +18,9 @@ package org.apache.flink.state.forst; import org.apache.flink.api.common.state.v2.MapState; +import org.apache.flink.api.common.state.v2.MapStateDescriptor; import org.apache.flink.api.common.state.v2.State; +import org.apache.flink.api.common.state.v2.StateDescriptor; import org.apache.flink.api.common.state.v2.StateIterator; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; @@ -31,8 +33,6 @@ import org.apache.flink.runtime.asyncprocessing.StateRequestHandler; import org.apache.flink.runtime.asyncprocessing.StateRequestType; import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder; import org.apache.flink.runtime.state.v2.AbstractMapState; -import org.apache.flink.runtime.state.v2.MapStateDescriptor; -import org.apache.flink.runtime.state.v2.StateDescriptor; import org.apache.flink.util.Preconditions; import org.forstdb.ColumnFamilyHandle; diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStReducingState.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStReducingState.java index da6e1dfbb7c..2f32fc3577d 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStReducingState.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStReducingState.java @@ -18,6 +18,7 @@ package org.apache.flink.state.forst; +import org.apache.flink.api.common.state.v2.ReducingStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataOutputSerializer; @@ -30,7 +31,6 @@ import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.v2.AbstractReducingState; -import org.apache.flink.runtime.state.v2.ReducingStateDescriptor; import org.apache.flink.util.Preconditions; import org.forstdb.ColumnFamilyHandle; diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java index a3d2c41d0b1..8a6a4dc0c08 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java @@ -19,6 +19,7 @@ package org.apache.flink.state.forst; import org.apache.flink.api.common.state.v2.ValueState; +import org.apache.flink.api.common.state.v2.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataOutputSerializer; @@ -31,7 +32,6 @@ import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.v2.AbstractValueState; -import org.apache.flink.runtime.state.v2.ValueStateDescriptor; import org.apache.flink.util.Preconditions; import org.forstdb.ColumnFamilyHandle; diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java index c11521a5ed0..5db0c25aa09 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java @@ -19,8 +19,12 @@ package org.apache.flink.state.forst; import org.apache.flink.api.common.functions.AggregateFunction; +import org.apache.flink.api.common.state.v2.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.v2.ListStateDescriptor; +import org.apache.flink.api.common.state.v2.MapStateDescriptor; import org.apache.flink.api.common.state.v2.State; import org.apache.flink.api.common.state.v2.StateFuture; +import org.apache.flink.api.common.state.v2.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.tuple.Tuple2; @@ -36,10 +40,6 @@ import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; -import org.apache.flink.runtime.state.v2.AggregatingStateDescriptor; -import org.apache.flink.runtime.state.v2.ListStateDescriptor; -import org.apache.flink.runtime.state.v2.MapStateDescriptor; -import org.apache.flink.runtime.state.v2.ValueStateDescriptor; import org.apache.flink.runtime.state.v2.internal.InternalPartitionedState; import org.apache.flink.util.function.BiFunctionWithException; import org.apache.flink.util.function.FunctionWithException; diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStListStateTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStListStateTest.java index c9adc53a263..1da3a146f31 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStListStateTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStListStateTest.java @@ -18,8 +18,8 @@ package org.apache.flink.state.forst; +import org.apache.flink.api.common.state.v2.ListStateDescriptor; import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.runtime.state.v2.ListStateDescriptor; import org.apache.flink.runtime.state.v2.internal.InternalListState; import org.junit.jupiter.api.Test; diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategyTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategyTest.java index dc333dd4e25..7d330a53add 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategyTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategyTest.java @@ -18,6 +18,7 @@ package org.apache.flink.state.forst.snapshot; +import org.apache.flink.api.common.state.v2.StateDescriptor; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.runtime.checkpoint.CheckpointOptions; @@ -28,7 +29,6 @@ import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.SnapshotResources; import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory; import org.apache.flink.runtime.state.v2.RegisteredKeyValueStateBackendMetaInfo; -import org.apache.flink.runtime.state.v2.StateDescriptor; import org.apache.flink.state.forst.ForStExtension; import org.apache.flink.state.forst.ForStOperationUtils; import org.apache.flink.state.forst.ForStStateDataTransfer; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/MockOperatorStateStore.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/MockOperatorStateStore.java index 14752d53a43..4c6a2e82968 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/MockOperatorStateStore.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/MockOperatorStateStore.java @@ -70,14 +70,14 @@ public class MockOperatorStateStore @Override public <K, V> BroadcastState<K, V> getBroadcastState( - org.apache.flink.runtime.state.v2.MapStateDescriptor<K, V> stateDescriptor) + org.apache.flink.api.common.state.v2.MapStateDescriptor<K, V> stateDescriptor) throws Exception { return getBroadcastState(StateDescriptorUtils.transformFromV2ToV1(stateDescriptor)); } @Override public <S> org.apache.flink.api.common.state.v2.ListState<S> getListState( - org.apache.flink.runtime.state.v2.ListStateDescriptor<S> stateDescriptor) + org.apache.flink.api.common.state.v2.ListStateDescriptor<S> stateDescriptor) throws Exception { return new OperatorListStateAdaptor<>( getListState(StateDescriptorUtils.transformFromV2ToV1(stateDescriptor))); @@ -85,7 +85,7 @@ public class MockOperatorStateStore @Override public <S> org.apache.flink.api.common.state.v2.ListState<S> getUnionListState( - org.apache.flink.runtime.state.v2.ListStateDescriptor<S> stateDescriptor) + org.apache.flink.api.common.state.v2.ListStateDescriptor<S> stateDescriptor) throws Exception { return new OperatorListStateAdaptor<>( getListState(StateDescriptorUtils.transformFromV2ToV1(stateDescriptor))); diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/asyncprocessing/state/JoinRecordAsyncStateViews.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/asyncprocessing/state/JoinRecordAsyncStateViews.java index 1d690884855..31a4a3447d0 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/asyncprocessing/state/JoinRecordAsyncStateViews.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/asyncprocessing/state/JoinRecordAsyncStateViews.java @@ -20,13 +20,13 @@ package org.apache.flink.table.runtime.operators.join.stream.asyncprocessing.sta import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.state.v2.MapState; +import org.apache.flink.api.common.state.v2.MapStateDescriptor; import org.apache.flink.api.common.state.v2.StateFuture; import org.apache.flink.api.common.state.v2.ValueState; +import org.apache.flink.api.common.state.v2.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.core.state.StateFutureUtils; -import org.apache.flink.runtime.state.v2.MapStateDescriptor; -import org.apache.flink.runtime.state.v2.ValueStateDescriptor; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.operators.join.stream.utils.JoinInputSideSpec; diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/asyncprocessing/state/OuterJoinRecordAsyncStateViews.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/asyncprocessing/state/OuterJoinRecordAsyncStateViews.java index 013304a1bbc..63c19fd1dd6 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/asyncprocessing/state/OuterJoinRecordAsyncStateViews.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/asyncprocessing/state/OuterJoinRecordAsyncStateViews.java @@ -20,15 +20,15 @@ package org.apache.flink.table.runtime.operators.join.stream.asyncprocessing.sta import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.state.v2.MapState; +import org.apache.flink.api.common.state.v2.MapStateDescriptor; import org.apache.flink.api.common.state.v2.StateFuture; import org.apache.flink.api.common.state.v2.ValueState; +import org.apache.flink.api.common.state.v2.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.core.state.StateFutureUtils; -import org.apache.flink.runtime.state.v2.MapStateDescriptor; -import org.apache.flink.runtime.state.v2.ValueStateDescriptor; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.operators.join.stream.utils.JoinInputSideSpec;