This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4fce102f8448777f7c5eb1b7859de5cd1162621c
Author: Zakelly <zakelly....@gmail.com>
AuthorDate: Wed Jan 8 18:16:06 2025 +0800

    [FLINK-37045][Runtime] Move state descriptor v2 to flink-core
---
 .../state/v2/AggregatingStateDescriptor.java       |  4 +-
 .../api/common}/state/v2/ListStateDescriptor.java  |  5 +-
 .../api/common}/state/v2/MapStateDescriptor.java   |  5 +-
 .../common}/state/v2/ReducingStateDescriptor.java  |  4 +-
 .../api/common}/state/v2/StateDescriptor.java      |  7 +-
 .../common}/state/v2/StateSerializerReference.java |  6 +-
 .../api/common}/state/v2/ValueStateDescriptor.java |  5 +-
 .../state/v2/AggregatingStateDescriptorTest.java   |  2 +-
 .../common}/state/v2/ListStateDescriptorTest.java  |  2 +-
 .../common}/state/v2/MapStateDescriptorTest.java   |  2 +-
 .../state/v2/ReducingStateDescriptorTest.java      |  2 +-
 .../api/common}/state/v2/StateDescriptorTest.java  |  2 +-
 .../common}/state/v2/ValueStateDescriptorTest.java |  2 +-
 .../impl/context/DefaultStateManager.java          | 10 +--
 .../AbstractAsyncStateStreamOperator.java          |  2 +-
 .../AbstractAsyncStateStreamOperatorV2.java        |  2 +-
 .../runtime/state/AsyncKeyedStateBackend.java      |  2 +-
 .../runtime/state/DefaultOperatorStateBackend.java |  6 +-
 .../runtime/state/v2/AbstractAggregatingState.java |  1 +
 .../flink/runtime/state/v2/AbstractKeyedState.java |  1 +
 .../flink/runtime/state/v2/AbstractListState.java  |  1 +
 .../flink/runtime/state/v2/AbstractMapState.java   |  1 +
 .../runtime/state/v2/AbstractReducingState.java    |  1 +
 .../flink/runtime/state/v2/AbstractValueState.java |  1 +
 .../state/v2/AggregatingStateDescriptor.java       | 48 ++----------
 .../runtime/state/v2/DefaultKeyedStateStore.java   |  5 ++
 .../flink/runtime/state/v2/KeyedStateStore.java    |  5 ++
 .../runtime/state/v2/ListStateDescriptor.java      | 29 ++-----
 .../flink/runtime/state/v2/MapStateDescriptor.java | 89 ++--------------------
 .../flink/runtime/state/v2/OperatorStateStore.java |  2 +
 .../runtime/state/v2/ReducingStateDescriptor.java  | 43 ++---------
 .../v2/RegisteredKeyValueStateBackendMetaInfo.java |  1 +
 .../runtime/state/v2/StateDescriptorUtils.java     |  7 ++
 .../runtime/state/v2/ValueStateDescriptor.java     | 29 ++-----
 .../v2/adaptor/AsyncKeyedStateBackendAdaptor.java  |  2 +-
 .../runtime/state/v2/ttl/TtlStateFactory.java      | 12 +--
 .../StreamGroupedReduceAsyncStateOperator.java     |  2 +-
 .../api/operators/StreamOperatorStateHandler.java  |  2 +-
 .../api/operators/StreamingRuntimeContext.java     | 12 +--
 .../AsyncExecutionControllerTest.java              |  2 +-
 .../flink/runtime/state/StateBackendTestUtils.java |  4 +-
 .../state/v2/AbstractAggregatingStateTest.java     |  1 +
 .../state/v2/AbstractKeyedStateTestBase.java       |  1 +
 .../runtime/state/v2/AbstractListStateTest.java    |  1 +
 .../runtime/state/v2/AbstractMapStateTest.java     |  1 +
 .../state/v2/AbstractReducingStateTest.java        |  1 +
 .../runtime/state/v2/AbstractValueStateTest.java   |  1 +
 .../v2/AsyncKeyedStateBackendAdaptorTest.java      |  6 ++
 .../runtime/state/v2/StateBackendTestV2Base.java   |  1 +
 .../api/operators/StreamingRuntimeContextTest.java | 44 ++++++-----
 .../flink/state/forst/ForStAggregatingState.java   |  2 +-
 .../flink/state/forst/ForStKeyedStateBackend.java  | 10 +--
 .../apache/flink/state/forst/ForStListState.java   |  2 +-
 .../apache/flink/state/forst/ForStMapState.java    |  4 +-
 .../flink/state/forst/ForStReducingState.java      |  2 +-
 .../apache/flink/state/forst/ForStValueState.java  |  2 +-
 .../state/forst/ForStDBOperationTestBase.java      |  8 +-
 .../flink/state/forst/ForStListStateTest.java      |  2 +-
 .../ForStIncrementalSnapshotStrategyTest.java      |  2 +-
 .../collect/utils/MockOperatorStateStore.java      |  6 +-
 .../state/JoinRecordAsyncStateViews.java           |  4 +-
 .../state/OuterJoinRecordAsyncStateViews.java      |  4 +-
 62 files changed, 176 insertions(+), 299 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AggregatingStateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/AggregatingStateDescriptor.java
similarity index 96%
copy from 
flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AggregatingStateDescriptor.java
copy to 
flink-core/src/main/java/org/apache/flink/api/common/state/v2/AggregatingStateDescriptor.java
index 1865647c792..26af7de75f2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AggregatingStateDescriptor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/AggregatingStateDescriptor.java
@@ -16,8 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.state.v2;
+package org.apache.flink.api.common.state.v2;
 
+import org.apache.flink.annotation.Experimental;
 import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -36,6 +37,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * @param <ACC> The type of the accumulator (intermediate aggregation state).
  * @param <OUT> The type of the values that are returned from the state.
  */
+@Experimental
 public class AggregatingStateDescriptor<IN, ACC, OUT> extends 
StateDescriptor<ACC> {
 
     private final AggregateFunction<IN, ACC, OUT> aggregateFunction;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ListStateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/ListStateDescriptor.java
similarity index 94%
copy from 
flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ListStateDescriptor.java
copy to 
flink-core/src/main/java/org/apache/flink/api/common/state/v2/ListStateDescriptor.java
index 3e41ee8b1f0..1409e6bc52e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ListStateDescriptor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/ListStateDescriptor.java
@@ -16,9 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.state.v2;
+package org.apache.flink.api.common.state.v2;
 
-import org.apache.flink.api.common.state.v2.ListState;
+import org.apache.flink.annotation.Experimental;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
@@ -30,6 +30,7 @@ import javax.annotation.Nonnull;
  *
  * @param <T> The type of each value that the list state can hold.
  */
+@Experimental
 public class ListStateDescriptor<T> extends StateDescriptor<T> {
 
     /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/MapStateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/MapStateDescriptor.java
similarity index 97%
copy from 
flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/MapStateDescriptor.java
copy to 
flink-core/src/main/java/org/apache/flink/api/common/state/v2/MapStateDescriptor.java
index f691094b71b..adfe55a5c83 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/MapStateDescriptor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/MapStateDescriptor.java
@@ -16,12 +16,12 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.state.v2;
+package org.apache.flink.api.common.state.v2;
 
+import org.apache.flink.annotation.Experimental;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.SerializerFactory;
-import org.apache.flink.api.common.state.v2.MapState;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
@@ -35,6 +35,7 @@ import javax.annotation.Nullable;
  * @param <UK> The type of the user key for this map state.
  * @param <UV> The type of the values that the map state can hold.
  */
+@Experimental
 public class MapStateDescriptor<UK, UV> extends StateDescriptor<UV> {
 
     /** The serializer for the user key. */
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ReducingStateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/ReducingStateDescriptor.java
similarity index 96%
copy from 
flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ReducingStateDescriptor.java
copy to 
flink-core/src/main/java/org/apache/flink/api/common/state/v2/ReducingStateDescriptor.java
index f93a07c3d94..f3bbf18fbe1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ReducingStateDescriptor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/ReducingStateDescriptor.java
@@ -16,8 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.state.v2;
+package org.apache.flink.api.common.state.v2;
 
+import org.apache.flink.annotation.Experimental;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -31,6 +32,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  *
  * @param <T> The type of the values that can be added to the state.
  */
+@Experimental
 public class ReducingStateDescriptor<T> extends StateDescriptor<T> {
 
     private final ReduceFunction<T> reduceFunction;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/StateDescriptor.java
similarity index 97%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptor.java
rename to 
flink-core/src/main/java/org/apache/flink/api/common/state/v2/StateDescriptor.java
index d7f19fc4a13..3b16231462c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/StateDescriptor.java
@@ -16,8 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.state.v2;
+package org.apache.flink.api.common.state.v2;
 
+import org.apache.flink.annotation.Experimental;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.SerializerFactory;
@@ -38,12 +39,13 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  *
  * @param <T> The type of the value of the state object described by this 
state descriptor.
  */
-@Internal
+@Experimental
 public abstract class StateDescriptor<T> implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
     /** An enumeration of the types of supported states. */
+    @Internal
     public enum Type {
         VALUE,
         LIST,
@@ -184,5 +186,6 @@ public abstract class StateDescriptor<T> implements 
Serializable {
     }
 
     /** Return the specific {@code Type} of described state. */
+    @Internal
     public abstract Type getType();
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateSerializerReference.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/StateSerializerReference.java
similarity index 95%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateSerializerReference.java
rename to 
flink-core/src/main/java/org/apache/flink/api/common/state/v2/StateSerializerReference.java
index 8b35b2a5487..d71c949301b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateSerializerReference.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/StateSerializerReference.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.state.v2;
+package org.apache.flink.api.common.state.v2;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
@@ -36,10 +36,12 @@ import static 
org.apache.flink.util.Preconditions.checkState;
 
 /**
  * A reference to a serializer. This also provides functions for lazy 
initialization.
+ * Package-private for internal use only.
  *
  * @param <T> the type for serialization.
  */
-public class StateSerializerReference<T> extends 
AtomicReference<TypeSerializer<T>> {
+@Internal
+class StateSerializerReference<T> extends AtomicReference<TypeSerializer<T>> {
     private static final Logger LOG = 
LoggerFactory.getLogger(StateSerializerReference.class);
 
     /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ValueStateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/ValueStateDescriptor.java
similarity index 95%
copy from 
flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ValueStateDescriptor.java
copy to 
flink-core/src/main/java/org/apache/flink/api/common/state/v2/ValueStateDescriptor.java
index 20437cd4ec7..dc5b1251cbd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ValueStateDescriptor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/ValueStateDescriptor.java
@@ -16,9 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.state.v2;
+package org.apache.flink.api.common.state.v2;
 
-import org.apache.flink.api.common.state.v2.ValueState;
+import org.apache.flink.annotation.Experimental;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
@@ -30,6 +30,7 @@ import javax.annotation.Nonnull;
  *
  * @param <T> The type of the values that the value state can hold.
  */
+@Experimental
 public class ValueStateDescriptor<T> extends StateDescriptor<T> {
 
     /**
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AggregatingStateDescriptorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/state/v2/AggregatingStateDescriptorTest.java
similarity index 98%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AggregatingStateDescriptorTest.java
rename to 
flink-core/src/test/java/org/apache/flink/api/common/state/v2/AggregatingStateDescriptorTest.java
index 930a0c69124..0b0685acfff 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AggregatingStateDescriptorTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/state/v2/AggregatingStateDescriptorTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.state.v2;
+package org.apache.flink.api.common.state.v2;
 
 import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/ListStateDescriptorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/state/v2/ListStateDescriptorTest.java
similarity index 97%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/ListStateDescriptorTest.java
rename to 
flink-core/src/test/java/org/apache/flink/api/common/state/v2/ListStateDescriptorTest.java
index 9df21b4f1c9..e719ae2b8ed 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/ListStateDescriptorTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/state/v2/ListStateDescriptorTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.state.v2;
+package org.apache.flink.api.common.state.v2;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.core.testutils.CommonTestUtils;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/MapStateDescriptorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/state/v2/MapStateDescriptorTest.java
similarity index 98%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/MapStateDescriptorTest.java
rename to 
flink-core/src/test/java/org/apache/flink/api/common/state/v2/MapStateDescriptorTest.java
index 7ea88ad79b5..8fcb9da187a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/MapStateDescriptorTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/state/v2/MapStateDescriptorTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.state.v2;
+package org.apache.flink.api.common.state.v2;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.core.testutils.CommonTestUtils;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/ReducingStateDescriptorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/state/v2/ReducingStateDescriptorTest.java
similarity index 98%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/ReducingStateDescriptorTest.java
rename to 
flink-core/src/test/java/org/apache/flink/api/common/state/v2/ReducingStateDescriptorTest.java
index f1f789bca61..f3879ed7be0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/ReducingStateDescriptorTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/state/v2/ReducingStateDescriptorTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.state.v2;
+package org.apache.flink.api.common.state.v2;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/StateDescriptorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/state/v2/StateDescriptorTest.java
similarity index 99%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/StateDescriptorTest.java
rename to 
flink-core/src/test/java/org/apache/flink/api/common/state/v2/StateDescriptorTest.java
index f0139114a49..1c1587c68be 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/StateDescriptorTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/state/v2/StateDescriptorTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.state.v2;
+package org.apache.flink.api.common.state.v2;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.StateTtlConfig;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/ValueStateDescriptorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/state/v2/ValueStateDescriptorTest.java
similarity index 97%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/ValueStateDescriptorTest.java
rename to 
flink-core/src/test/java/org/apache/flink/api/common/state/v2/ValueStateDescriptorTest.java
index f1bf3820825..edcdf42d7ce 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/ValueStateDescriptorTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/state/v2/ValueStateDescriptorTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.state.v2;
+package org.apache.flink.api.common.state.v2;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.core.testutils.CommonTestUtils;
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultStateManager.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultStateManager.java
index d4074d1cfdc..39018a4b5ff 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultStateManager.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultStateManager.java
@@ -27,18 +27,18 @@ import 
org.apache.flink.api.common.state.ReducingStateDeclaration;
 import org.apache.flink.api.common.state.StateDeclaration;
 import org.apache.flink.api.common.state.ValueStateDeclaration;
 import org.apache.flink.api.common.state.v2.AggregatingState;
+import org.apache.flink.api.common.state.v2.AggregatingStateDescriptor;
 import org.apache.flink.api.common.state.v2.ListState;
+import org.apache.flink.api.common.state.v2.ListStateDescriptor;
 import org.apache.flink.api.common.state.v2.MapState;
+import org.apache.flink.api.common.state.v2.MapStateDescriptor;
 import org.apache.flink.api.common.state.v2.ReducingState;
+import org.apache.flink.api.common.state.v2.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.v2.ValueState;
+import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.datastream.api.context.StateManager;
-import org.apache.flink.runtime.state.v2.AggregatingStateDescriptor;
-import org.apache.flink.runtime.state.v2.ListStateDescriptor;
-import org.apache.flink.runtime.state.v2.MapStateDescriptor;
 import org.apache.flink.runtime.state.v2.OperatorStateStore;
-import org.apache.flink.runtime.state.v2.ReducingStateDescriptor;
-import org.apache.flink.runtime.state.v2.ValueStateDescriptor;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.util.Preconditions;
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java
index 26c948ffc48..ad9ce787cac 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java
@@ -23,6 +23,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.api.common.state.v2.State;
+import org.apache.flink.api.common.state.v2.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
@@ -32,7 +33,6 @@ import 
org.apache.flink.runtime.asyncprocessing.declare.DeclarationManager;
 import org.apache.flink.runtime.event.WatermarkEvent;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
-import org.apache.flink.runtime.state.v2.StateDescriptor;
 import org.apache.flink.runtime.state.v2.adaptor.AsyncKeyedStateBackendAdaptor;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.Input;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java
index 5a60c455b73..ab3eff901fa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.asyncprocessing.operators;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.v2.State;
+import org.apache.flink.api.common.state.v2.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
@@ -29,7 +30,6 @@ import org.apache.flink.runtime.asyncprocessing.RecordContext;
 import org.apache.flink.runtime.asyncprocessing.declare.DeclarationManager;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
-import org.apache.flink.runtime.state.v2.StateDescriptor;
 import org.apache.flink.runtime.state.v2.adaptor.AsyncKeyedStateBackendAdaptor;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
 import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java
index bdd1571fbce..21fdf4b6e2f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java
@@ -21,13 +21,13 @@ package org.apache.flink.runtime.state;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.state.InternalCheckpointListener;
 import org.apache.flink.api.common.state.v2.State;
+import org.apache.flink.api.common.state.v2.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
 import org.apache.flink.runtime.asyncprocessing.RecordContext;
 import org.apache.flink.runtime.asyncprocessing.StateExecutor;
 import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
 import org.apache.flink.runtime.checkpoint.SnapshotType;
-import org.apache.flink.runtime.state.v2.StateDescriptor;
 import org.apache.flink.runtime.state.v2.internal.InternalKeyedState;
 import org.apache.flink.util.Disposable;
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index 921409ad31a..f2f4e1cf97a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -224,14 +224,14 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
 
     @Override
     public <K, V> BroadcastState<K, V> getBroadcastState(
-            org.apache.flink.runtime.state.v2.MapStateDescriptor<K, V> 
stateDescriptor)
+            org.apache.flink.api.common.state.v2.MapStateDescriptor<K, V> 
stateDescriptor)
             throws Exception {
         return 
getBroadcastState(StateDescriptorUtils.transformFromV2ToV1(stateDescriptor));
     }
 
     @Override
     public <S> org.apache.flink.api.common.state.v2.ListState<S> getListState(
-            org.apache.flink.runtime.state.v2.ListStateDescriptor<S> 
stateDescriptor)
+            org.apache.flink.api.common.state.v2.ListStateDescriptor<S> 
stateDescriptor)
             throws Exception {
         return new OperatorListStateAdaptor<>(
                 
getListState(StateDescriptorUtils.transformFromV2ToV1(stateDescriptor)));
@@ -239,7 +239,7 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
 
     @Override
     public <S> org.apache.flink.api.common.state.v2.ListState<S> 
getUnionListState(
-            org.apache.flink.runtime.state.v2.ListStateDescriptor<S> 
stateDescriptor)
+            org.apache.flink.api.common.state.v2.ListStateDescriptor<S> 
stateDescriptor)
             throws Exception {
         return new OperatorListStateAdaptor<>(
                 
getListState(StateDescriptorUtils.transformFromV2ToV1(stateDescriptor)));
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractAggregatingState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractAggregatingState.java
index 95850d928ce..222c3688913 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractAggregatingState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractAggregatingState.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state.v2;
 
 import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.state.v2.AggregatingState;
+import org.apache.flink.api.common.state.v2.AggregatingStateDescriptor;
 import org.apache.flink.api.common.state.v2.StateFuture;
 import org.apache.flink.core.state.StateFutureUtils;
 import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractKeyedState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractKeyedState.java
index d1492410992..2056d710ce6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractKeyedState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractKeyedState.java
@@ -19,6 +19,7 @@ package org.apache.flink.runtime.state.v2;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.state.v2.State;
+import org.apache.flink.api.common.state.v2.StateDescriptor;
 import org.apache.flink.api.common.state.v2.StateFuture;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractListState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractListState.java
index 5c7a6ba6bf3..263de23f31a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractListState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractListState.java
@@ -18,6 +18,7 @@
 package org.apache.flink.runtime.state.v2;
 
 import org.apache.flink.api.common.state.v2.ListState;
+import org.apache.flink.api.common.state.v2.ListStateDescriptor;
 import org.apache.flink.api.common.state.v2.StateFuture;
 import org.apache.flink.api.common.state.v2.StateIterator;
 import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractMapState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractMapState.java
index 108b3128344..fb3ad546aed 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractMapState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractMapState.java
@@ -18,6 +18,7 @@
 package org.apache.flink.runtime.state.v2;
 
 import org.apache.flink.api.common.state.v2.MapState;
+import org.apache.flink.api.common.state.v2.MapStateDescriptor;
 import org.apache.flink.api.common.state.v2.StateFuture;
 import org.apache.flink.api.common.state.v2.StateIterator;
 import org.apache.flink.api.java.tuple.Tuple2;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractReducingState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractReducingState.java
index 4fb7967bd6c..daf22c4398a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractReducingState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractReducingState.java
@@ -19,6 +19,7 @@ package org.apache.flink.runtime.state.v2;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.state.v2.ReducingState;
+import org.apache.flink.api.common.state.v2.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.v2.StateFuture;
 import org.apache.flink.core.state.StateFutureUtils;
 import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractValueState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractValueState.java
index 0affbf415f3..eda66c9cea2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractValueState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractValueState.java
@@ -19,6 +19,7 @@ package org.apache.flink.runtime.state.v2;
 
 import org.apache.flink.api.common.state.v2.StateFuture;
 import org.apache.flink.api.common.state.v2.ValueState;
+import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
 import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
 import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
 import org.apache.flink.runtime.asyncprocessing.StateRequestType;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AggregatingStateDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AggregatingStateDescriptor.java
index 1865647c792..078b189fbe1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AggregatingStateDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AggregatingStateDescriptor.java
@@ -24,59 +24,25 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 
 import javax.annotation.Nonnull;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
- * A {@link StateDescriptor} for {@link 
org.apache.flink.api.common.state.v2.AggregatingState}.
- *
- * <p>The type internally stored in the state is the type of the {@code 
Accumulator} of the {@code
- * AggregateFunction}.
- *
- * @param <IN> The type of the values that are added to the state.
- * @param <ACC> The type of the accumulator (intermediate aggregation state).
- * @param <OUT> The type of the values that are returned from the state.
+ * This class is deprecated and only a placeholder for compatibility for 
EXISTING PRs. This is never
+ * released before, and will be safely removed before 2.0 release.
  */
-public class AggregatingStateDescriptor<IN, ACC, OUT> extends 
StateDescriptor<ACC> {
-
-    private final AggregateFunction<IN, ACC, OUT> aggregateFunction;
+@Deprecated
+public class AggregatingStateDescriptor<IN, ACC, OUT>
+        extends 
org.apache.flink.api.common.state.v2.AggregatingStateDescriptor<IN, ACC, OUT> {
 
-    /**
-     * Create a new state descriptor with the given name, function, and type.
-     *
-     * @param stateId The (unique) name for the state.
-     * @param aggregateFunction The {@code AggregateFunction} used to 
aggregate the state.
-     * @param typeInfo The type of the accumulator. The accumulator is stored 
in the state.
-     */
     public AggregatingStateDescriptor(
             @Nonnull String stateId,
             @Nonnull AggregateFunction<IN, ACC, OUT> aggregateFunction,
             @Nonnull TypeInformation<ACC> typeInfo) {
-        super(stateId, typeInfo);
-        this.aggregateFunction = checkNotNull(aggregateFunction);
+        super(stateId, aggregateFunction, typeInfo);
     }
 
-    /**
-     * Create a new {@code ReducingStateDescriptor} with the given stateId and 
the given type
-     * serializer.
-     *
-     * @param stateId The (unique) stateId for the state.
-     * @param serializer The type serializer for accumulator.
-     */
     public AggregatingStateDescriptor(
             @Nonnull String stateId,
             @Nonnull AggregateFunction<IN, ACC, OUT> aggregateFunction,
             @Nonnull TypeSerializer<ACC> serializer) {
-        super(stateId, serializer);
-        this.aggregateFunction = checkNotNull(aggregateFunction);
-    }
-
-    /** Returns the Aggregate function for this state. */
-    public AggregateFunction<IN, ACC, OUT> getAggregateFunction() {
-        return aggregateFunction;
-    }
-
-    @Override
-    public Type getType() {
-        return Type.AGGREGATING;
+        super(stateId, aggregateFunction, serializer);
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/DefaultKeyedStateStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/DefaultKeyedStateStore.java
index 92eed6decd9..52565b7fad6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/DefaultKeyedStateStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/DefaultKeyedStateStore.java
@@ -20,10 +20,15 @@ package org.apache.flink.runtime.state.v2;
 
 import org.apache.flink.api.common.functions.SerializerFactory;
 import org.apache.flink.api.common.state.v2.AggregatingState;
+import org.apache.flink.api.common.state.v2.AggregatingStateDescriptor;
 import org.apache.flink.api.common.state.v2.ListState;
+import org.apache.flink.api.common.state.v2.ListStateDescriptor;
 import org.apache.flink.api.common.state.v2.MapState;
+import org.apache.flink.api.common.state.v2.MapStateDescriptor;
 import org.apache.flink.api.common.state.v2.ReducingState;
+import org.apache.flink.api.common.state.v2.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.v2.ValueState;
+import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
 import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/KeyedStateStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/KeyedStateStore.java
index ccb58e5c763..b47b3b51623 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/KeyedStateStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/KeyedStateStore.java
@@ -20,11 +20,16 @@ package org.apache.flink.runtime.state.v2;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.state.v2.AggregatingState;
+import org.apache.flink.api.common.state.v2.AggregatingStateDescriptor;
 import org.apache.flink.api.common.state.v2.ListState;
+import org.apache.flink.api.common.state.v2.ListStateDescriptor;
 import org.apache.flink.api.common.state.v2.MapState;
+import org.apache.flink.api.common.state.v2.MapStateDescriptor;
 import org.apache.flink.api.common.state.v2.ReducingState;
+import org.apache.flink.api.common.state.v2.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.v2.State;
 import org.apache.flink.api.common.state.v2.ValueState;
+import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
 
 import javax.annotation.Nonnull;
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ListStateDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ListStateDescriptor.java
index 3e41ee8b1f0..6e22610602c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ListStateDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ListStateDescriptor.java
@@ -18,43 +18,24 @@
 
 package org.apache.flink.runtime.state.v2;
 
-import org.apache.flink.api.common.state.v2.ListState;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
 import javax.annotation.Nonnull;
 
 /**
- * {@link StateDescriptor} for {@link ListState}. This can be used to create 
partitioned list state
- * internally.
- *
- * @param <T> The type of each value that the list state can hold.
+ * This class is deprecated and only a placeholder for compatibility for 
EXISTING PRs. This is never
+ * released before, and will be safely removed before 2.0 release.
  */
-public class ListStateDescriptor<T> extends StateDescriptor<T> {
+@Deprecated
+public class ListStateDescriptor<T>
+        extends org.apache.flink.api.common.state.v2.ListStateDescriptor<T> {
 
-    /**
-     * Creates a new {@code ListStateDescriptor} with the given stateId and 
type.
-     *
-     * @param stateId The (unique) stateId for the state.
-     * @param typeInfo The type of the values in the state.
-     */
     public ListStateDescriptor(@Nonnull String stateId, @Nonnull 
TypeInformation<T> typeInfo) {
         super(stateId, typeInfo);
     }
 
-    /**
-     * Create a new {@code ListStateDescriptor} with the given stateId and the 
given type
-     * serializer.
-     *
-     * @param stateId The (unique) stateId for the state.
-     * @param serializer The type serializer for the values in the state.
-     */
     public ListStateDescriptor(@Nonnull String stateId, @Nonnull 
TypeSerializer<T> serializer) {
         super(stateId, serializer);
     }
-
-    @Override
-    public Type getType() {
-        return Type.LIST;
-    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/MapStateDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/MapStateDescriptor.java
index f691094b71b..37573e25e64 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/MapStateDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/MapStateDescriptor.java
@@ -18,105 +18,30 @@
 
 package org.apache.flink.runtime.state.v2;
 
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.SerializerFactory;
-import org.apache.flink.api.common.state.v2.MapState;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
 import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
 
 /**
- * {@link StateDescriptor} for {@link MapState}. This can be used to create 
partitioned map state
- * internally.
- *
- * @param <UK> The type of the user key for this map state.
- * @param <UV> The type of the values that the map state can hold.
+ * This class is deprecated and only a placeholder for compatibility for 
EXISTING PRs. This is never
+ * released before, and will be safely removed before 2.0 release.
  */
-public class MapStateDescriptor<UK, UV> extends StateDescriptor<UV> {
-
-    /** The serializer for the user key. */
-    @Nonnull private final StateSerializerReference<UK> userKeySerializer;
+@Deprecated
+public class MapStateDescriptor<UK, UV>
+        extends org.apache.flink.api.common.state.v2.MapStateDescriptor<UK, 
UV> {
 
-    /**
-     * Creates a new {@code MapStateDescriptor} with the given stateId and 
type.
-     *
-     * @param stateId The (unique) stateId for the state.
-     * @param userKeyTypeInfo The type of the user keys in the state.
-     * @param userValueTypeInfo The type of the values in the state.
-     */
     public MapStateDescriptor(
             @Nonnull String stateId,
             @Nonnull TypeInformation<UK> userKeyTypeInfo,
             @Nonnull TypeInformation<UV> userValueTypeInfo) {
-        super(stateId, userValueTypeInfo);
-        this.userKeySerializer = new 
StateSerializerReference<>(userKeyTypeInfo);
+        super(stateId, userKeyTypeInfo, userValueTypeInfo);
     }
 
-    /**
-     * Create a new {@code MapStateDescriptor} with the given stateId and the 
given type serializer.
-     *
-     * @param stateId The (unique) stateId for the state.
-     * @param userKeySerializer The serializer for the user keys in the state.
-     * @param userValueSerializer The serializer for the user values in the 
state.
-     */
     public MapStateDescriptor(
             @Nonnull String stateId,
             @Nonnull TypeSerializer<UK> userKeySerializer,
             @Nonnull TypeSerializer<UV> userValueSerializer) {
-        super(stateId, userValueSerializer);
-        this.userKeySerializer = new 
StateSerializerReference<>(userKeySerializer);
-    }
-
-    @Nonnull
-    public TypeSerializer<UK> getUserKeySerializer() {
-        TypeSerializer<UK> serializer = userKeySerializer.get();
-        if (serializer != null) {
-            return serializer.duplicate();
-        } else {
-            throw new IllegalStateException("Serializer not yet initialized.");
-        }
-    }
-
-    @Internal
-    @Nullable
-    public TypeInformation<UK> getUserKeyTypeInformation() {
-        return userKeySerializer.getTypeInformation();
-    }
-
-    /**
-     * Checks whether the serializer has been initialized. Serializer 
initialization is lazy, to
-     * allow parametrization of serializers with an {@link ExecutionConfig} 
via {@link
-     * #initializeSerializerUnlessSet(ExecutionConfig)}.
-     *
-     * @return True if the serializers have been initialized, false otherwise.
-     */
-    @Override
-    public boolean isSerializerInitialized() {
-        return super.isSerializerInitialized() && 
userKeySerializer.isInitialized();
-    }
-
-    /**
-     * Initializes the serializer, unless it has been initialized before.
-     *
-     * @param executionConfig The execution config to use when creating the 
serializer.
-     */
-    @Override
-    public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) 
{
-        super.initializeSerializerUnlessSet(executionConfig);
-        userKeySerializer.initializeUnlessSet(executionConfig);
-    }
-
-    @Override
-    public void initializeSerializerUnlessSet(SerializerFactory 
serializerFactory) {
-        super.initializeSerializerUnlessSet(serializerFactory);
-        userKeySerializer.initializeUnlessSet(serializerFactory);
-    }
-
-    @Override
-    public Type getType() {
-        return Type.MAP;
+        super(stateId, userKeySerializer, userValueSerializer);
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/OperatorStateStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/OperatorStateStore.java
index b9a28858b4f..50bd0c0a8d0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/OperatorStateStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/OperatorStateStore.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.state.v2;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.state.BroadcastState;
 import org.apache.flink.api.common.state.v2.ListState;
+import org.apache.flink.api.common.state.v2.ListStateDescriptor;
+import org.apache.flink.api.common.state.v2.MapStateDescriptor;
 
 import java.util.Set;
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ReducingStateDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ReducingStateDescriptor.java
index f93a07c3d94..053b9704eaa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ReducingStateDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ReducingStateDescriptor.java
@@ -24,54 +24,25 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 
 import javax.annotation.Nonnull;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
- * {@link StateDescriptor} for {@link 
org.apache.flink.api.common.state.v2.ReducingState}.
- *
- * @param <T> The type of the values that can be added to the state.
+ * This class is deprecated and only a placeholder for compatibility for 
EXISTING PRs. This is never
+ * released before, and will be safely removed before 2.0 release.
  */
-public class ReducingStateDescriptor<T> extends StateDescriptor<T> {
+@Deprecated
+public class ReducingStateDescriptor<T>
+        extends 
org.apache.flink.api.common.state.v2.ReducingStateDescriptor<T> {
 
-    private final ReduceFunction<T> reduceFunction;
-
-    /**
-     * Creates a new {@code ReducingStateDescriptor} with the given name and 
default value.
-     *
-     * @param name The (unique) name for the state.
-     * @param reduceFunction The {@code ReduceFunction} used to aggregate the 
state.
-     * @param typeInfo The type of the values in the state.
-     */
     public ReducingStateDescriptor(
             @Nonnull String name,
             @Nonnull ReduceFunction<T> reduceFunction,
             @Nonnull TypeInformation<T> typeInfo) {
-        super(name, typeInfo);
-        this.reduceFunction = checkNotNull(reduceFunction);
+        super(name, reduceFunction, typeInfo);
     }
 
-    /**
-     * Create a new {@code ReducingStateDescriptor} with the given stateId and 
the given type
-     * serializer.
-     *
-     * @param stateId The (unique) stateId for the state.
-     * @param serializer The type serializer for the values in the state.
-     */
     public ReducingStateDescriptor(
             @Nonnull String stateId,
             @Nonnull ReduceFunction<T> reduceFunction,
             @Nonnull TypeSerializer<T> serializer) {
-        super(stateId, serializer);
-        this.reduceFunction = checkNotNull(reduceFunction);
-    }
-
-    /** Returns the reduce function to be used for the reducing state. */
-    public ReduceFunction<T> getReduceFunction() {
-        return reduceFunction;
-    }
-
-    @Override
-    public Type getType() {
-        return Type.REDUCING;
+        super(stateId, reduceFunction, serializer);
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/RegisteredKeyValueStateBackendMetaInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/RegisteredKeyValueStateBackendMetaInfo.java
index 7f4667e9408..45107561f9a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/RegisteredKeyValueStateBackendMetaInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/RegisteredKeyValueStateBackendMetaInfo.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state.v2;
 
+import org.apache.flink.api.common.state.v2.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptorUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptorUtils.java
index ca57a657852..8b62628488f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptorUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptorUtils.java
@@ -18,6 +18,13 @@
 
 package org.apache.flink.runtime.state.v2;
 
+import org.apache.flink.api.common.state.v2.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.v2.ListStateDescriptor;
+import org.apache.flink.api.common.state.v2.MapStateDescriptor;
+import org.apache.flink.api.common.state.v2.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.v2.StateDescriptor;
+import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
+
 /**
  * Utilities for transforming {@link StateDescriptor} to {@link
  * org.apache.flink.api.common.state.StateDescriptor}.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ValueStateDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ValueStateDescriptor.java
index 20437cd4ec7..70e45ee9c3a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ValueStateDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ValueStateDescriptor.java
@@ -18,43 +18,24 @@
 
 package org.apache.flink.runtime.state.v2;
 
-import org.apache.flink.api.common.state.v2.ValueState;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
 import javax.annotation.Nonnull;
 
 /**
- * {@link StateDescriptor} for {@link ValueState}. This can be used to create 
partitioned value
- * state internally.
- *
- * @param <T> The type of the values that the value state can hold.
+ * This class is deprecated and only a placeholder for compatibility for 
EXISTING PRs. This is never
+ * released before, and will be safely removed before 2.0 release.
  */
-public class ValueStateDescriptor<T> extends StateDescriptor<T> {
+@Deprecated
+public class ValueStateDescriptor<T>
+        extends org.apache.flink.api.common.state.v2.ValueStateDescriptor<T> {
 
-    /**
-     * Creates a new {@code ValueStateDescriptor} with the given stateId and 
type.
-     *
-     * @param stateId The (unique) stateId for the state.
-     * @param typeInfo The type of the values in the state.
-     */
     public ValueStateDescriptor(@Nonnull String stateId, @Nonnull 
TypeInformation<T> typeInfo) {
         super(stateId, typeInfo);
     }
 
-    /**
-     * Create a new {@code ValueStateDescriptor} with the given stateId and 
the given type
-     * serializer.
-     *
-     * @param stateId The (unique) stateId for the state.
-     * @param serializer The type serializer for the values in the state.
-     */
     public ValueStateDescriptor(@Nonnull String stateId, @Nonnull 
TypeSerializer<T> serializer) {
         super(stateId, serializer);
     }
-
-    @Override
-    public Type getType() {
-        return Type.VALUE;
-    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java
index d0ec7188fd4..dae4a959d8d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.state.v2.adaptor;
 import org.apache.flink.api.common.state.CheckpointListener;
 import org.apache.flink.api.common.state.InternalCheckpointListener;
 import org.apache.flink.api.common.state.v2.State;
+import org.apache.flink.api.common.state.v2.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.asyncprocessing.RecordContext;
 import org.apache.flink.runtime.asyncprocessing.StateExecutor;
@@ -44,7 +45,6 @@ import 
org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.runtime.state.internal.InternalMapState;
 import org.apache.flink.runtime.state.internal.InternalReducingState;
 import org.apache.flink.runtime.state.internal.InternalValueState;
-import org.apache.flink.runtime.state.v2.StateDescriptor;
 import org.apache.flink.runtime.state.v2.StateDescriptorUtils;
 import org.apache.flink.runtime.state.v2.internal.InternalKeyedState;
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlStateFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlStateFactory.java
index e4b946dbeed..8a820490228 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlStateFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlStateFactory.java
@@ -19,7 +19,13 @@
 package org.apache.flink.runtime.state.v2.ttl;
 
 import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.state.v2.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.v2.ListStateDescriptor;
+import org.apache.flink.api.common.state.v2.MapStateDescriptor;
+import org.apache.flink.api.common.state.v2.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.v2.State;
+import org.apache.flink.api.common.state.v2.StateDescriptor;
+import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.CompositeSerializer;
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -31,12 +37,6 @@ import org.apache.flink.runtime.state.ttl.TtlReduceFunction;
 import org.apache.flink.runtime.state.ttl.TtlStateContext;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.runtime.state.ttl.TtlValue;
-import org.apache.flink.runtime.state.v2.AggregatingStateDescriptor;
-import org.apache.flink.runtime.state.v2.ListStateDescriptor;
-import org.apache.flink.runtime.state.v2.MapStateDescriptor;
-import org.apache.flink.runtime.state.v2.ReducingStateDescriptor;
-import org.apache.flink.runtime.state.v2.StateDescriptor;
-import org.apache.flink.runtime.state.v2.ValueStateDescriptor;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.SupplierWithException;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceAsyncStateOperator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceAsyncStateOperator.java
index 189769d11cb..9794e25ff72 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceAsyncStateOperator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceAsyncStateOperator.java
@@ -20,9 +20,9 @@ package org.apache.flink.streaming.api.operators;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.state.v2.ValueState;
+import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import 
org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator;
-import org.apache.flink.runtime.state.v2.ValueStateDescriptor;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
index 56d32bf1f3f..bf01d4df0c7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
@@ -421,7 +421,7 @@ public class StreamOperatorStateHandler {
     public <N, S extends org.apache.flink.api.common.state.v2.State, T> S 
getOrCreateKeyedState(
             N defaultNamespace,
             TypeSerializer<N> namespaceSerializer,
-            org.apache.flink.runtime.state.v2.StateDescriptor<T> 
stateDescriptor)
+            org.apache.flink.api.common.state.v2.StateDescriptor<T> 
stateDescriptor)
             throws Exception {
 
         if (asyncKeyedStateBackend != null) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
index c97dbec6c86..1859ffc2f8a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
@@ -269,7 +269,7 @@ public class StreamingRuntimeContext extends 
AbstractRuntimeUDFContext {
 
     // TODO: Reconstruct this after StateManager is ready in FLIP-410.
     public <T> org.apache.flink.api.common.state.v2.ValueState<T> 
getValueState(
-            org.apache.flink.runtime.state.v2.ValueStateDescriptor<T> 
stateProperties) {
+            org.apache.flink.api.common.state.v2.ValueStateDescriptor<T> 
stateProperties) {
         org.apache.flink.runtime.state.v2.KeyedStateStore keyedStateStore =
                 checkPreconditionsAndGetKeyedStateStoreV2(stateProperties);
         stateProperties.initializeSerializerUnlessSet(this::createSerializer);
@@ -277,7 +277,7 @@ public class StreamingRuntimeContext extends 
AbstractRuntimeUDFContext {
     }
 
     public <T> org.apache.flink.api.common.state.v2.ListState<T> getListState(
-            org.apache.flink.runtime.state.v2.ListStateDescriptor<T> 
stateProperties) {
+            org.apache.flink.api.common.state.v2.ListStateDescriptor<T> 
stateProperties) {
         org.apache.flink.runtime.state.v2.KeyedStateStore keyedStateStore =
                 checkPreconditionsAndGetKeyedStateStoreV2(stateProperties);
         stateProperties.initializeSerializerUnlessSet(this::createSerializer);
@@ -285,7 +285,7 @@ public class StreamingRuntimeContext extends 
AbstractRuntimeUDFContext {
     }
 
     public <UK, UV> org.apache.flink.api.common.state.v2.MapState<UK, UV> 
getMapState(
-            org.apache.flink.runtime.state.v2.MapStateDescriptor<UK, UV> 
stateProperties) {
+            org.apache.flink.api.common.state.v2.MapStateDescriptor<UK, UV> 
stateProperties) {
         org.apache.flink.runtime.state.v2.KeyedStateStore keyedStateStore =
                 checkPreconditionsAndGetKeyedStateStoreV2(stateProperties);
         stateProperties.initializeSerializerUnlessSet(this::createSerializer);
@@ -293,7 +293,7 @@ public class StreamingRuntimeContext extends 
AbstractRuntimeUDFContext {
     }
 
     public <T> org.apache.flink.api.common.state.v2.ReducingState<T> 
getReducingState(
-            org.apache.flink.runtime.state.v2.ReducingStateDescriptor<T> 
stateProperties) {
+            org.apache.flink.api.common.state.v2.ReducingStateDescriptor<T> 
stateProperties) {
         org.apache.flink.runtime.state.v2.KeyedStateStore keyedStateStore =
                 checkPreconditionsAndGetKeyedStateStoreV2(stateProperties);
         stateProperties.initializeSerializerUnlessSet(this::createSerializer);
@@ -302,7 +302,7 @@ public class StreamingRuntimeContext extends 
AbstractRuntimeUDFContext {
 
     public <IN, ACC, OUT>
             org.apache.flink.api.common.state.v2.AggregatingState<IN, OUT> 
getAggregatingState(
-                    
org.apache.flink.runtime.state.v2.AggregatingStateDescriptor<IN, ACC, OUT>
+                    
org.apache.flink.api.common.state.v2.AggregatingStateDescriptor<IN, ACC, OUT>
                             stateProperties) {
         org.apache.flink.runtime.state.v2.KeyedStateStore keyedStateStore =
                 checkPreconditionsAndGetKeyedStateStoreV2(stateProperties);
@@ -312,7 +312,7 @@ public class StreamingRuntimeContext extends 
AbstractRuntimeUDFContext {
 
     private org.apache.flink.runtime.state.v2.KeyedStateStore
             checkPreconditionsAndGetKeyedStateStoreV2(
-                    org.apache.flink.runtime.state.v2.StateDescriptor<?> 
stateDescriptor) {
+                    org.apache.flink.api.common.state.v2.StateDescriptor<?> 
stateDescriptor) {
         checkNotNull(stateDescriptor, "The state properties must not be null");
         checkState(
                 supportKeyedStateApiSet == SupportKeyedStateApiSet.STATE_V2,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
index 2311b361fea..0afe85ec5c1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.asyncprocessing;
 
 import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.api.common.state.v2.State;
+import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -36,7 +37,6 @@ import org.apache.flink.runtime.state.StateBackendTestUtils;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.v2.AbstractValueState;
-import org.apache.flink.runtime.state.v2.ValueStateDescriptor;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.ThrowingRunnable;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestUtils.java
index 10fe3f4d200..ace17d9671d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestUtils.java
@@ -137,7 +137,7 @@ public class StateBackendTestUtils {
                 S getOrCreateKeyedState(
                         N defaultNamespace,
                         TypeSerializer<N> namespaceSerializer,
-                        org.apache.flink.runtime.state.v2.StateDescriptor<SV> 
stateDesc)
+                        
org.apache.flink.api.common.state.v2.StateDescriptor<SV> stateDesc)
                         throws Exception {
             stateDesc.initializeSerializerUnlessSet(new ExecutionConfig());
             return (S) innerStateSupplier.get();
@@ -148,7 +148,7 @@ public class StateBackendTestUtils {
         public <N, S extends InternalKeyedState, SV> S createStateInternal(
                 @Nonnull N defaultNamespace,
                 @Nonnull TypeSerializer<N> namespaceSerializer,
-                @Nonnull org.apache.flink.runtime.state.v2.StateDescriptor<SV> 
stateDesc)
+                @Nonnull 
org.apache.flink.api.common.state.v2.StateDescriptor<SV> stateDesc)
                 throws Exception {
             stateDesc.initializeSerializerUnlessSet(new ExecutionConfig());
             return (S) innerStateSupplier.get();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractAggregatingStateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractAggregatingStateTest.java
index e9e890da864..1e8dc088ede 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractAggregatingStateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractAggregatingStateTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.state.v2;
 
 import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.state.v2.AggregatingStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase.java
index cae8c80da9a..19c408025b5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.state.v2;
 
 import org.apache.flink.api.common.state.v2.State;
+import org.apache.flink.api.common.state.v2.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
 import org.apache.flink.runtime.asyncprocessing.StateExecutor;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractListStateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractListStateTest.java
index 8f87102d50a..456b23dfd3b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractListStateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractListStateTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state.v2;
 
+import org.apache.flink.api.common.state.v2.ListStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.runtime.asyncprocessing.StateRequestType;
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractMapStateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractMapStateTest.java
index 57eb4029664..5d08c561b0c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractMapStateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractMapStateTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state.v2;
 
+import org.apache.flink.api.common.state.v2.MapStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.asyncprocessing.StateRequestType;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractReducingStateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractReducingStateTest.java
index 62066da5744..9c4de234c41 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractReducingStateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractReducingStateTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.state.v2;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.v2.ReducingStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractValueStateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractValueStateTest.java
index 228ec3073ea..307e7063e32 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractValueStateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractValueStateTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state.v2;
 
+import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.runtime.asyncprocessing.StateRequestType;
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AsyncKeyedStateBackendAdaptorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AsyncKeyedStateBackendAdaptorTest.java
index 371e4ef3ab3..9b0dd333644 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AsyncKeyedStateBackendAdaptorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AsyncKeyedStateBackendAdaptorTest.java
@@ -21,6 +21,12 @@ package org.apache.flink.runtime.state.v2;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.state.v2.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.v2.ListStateDescriptor;
+import org.apache.flink.api.common.state.v2.MapStateDescriptor;
+import org.apache.flink.api.common.state.v2.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.v2.StateDescriptor;
+import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/StateBackendTestV2Base.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/StateBackendTestV2Base.java
index b3fd578474c..a1a3165ba82 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/StateBackendTestV2Base.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/StateBackendTestV2Base.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.state.v2;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.StateTtlConfig;
 import org.apache.flink.api.common.state.v2.ValueState;
+import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
index 53fc130d9f7..f61696c7485 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
@@ -257,13 +257,14 @@ class StreamingRuntimeContextTest {
         final AtomicReference<Object> descriptorCapture = new 
AtomicReference<>();
 
         StreamingRuntimeContext context = 
createRuntimeContext(descriptorCapture, config, true);
-        org.apache.flink.runtime.state.v2.ValueStateDescriptor<TaskInfo> descr 
=
-                new org.apache.flink.runtime.state.v2.ValueStateDescriptor<>(
+        org.apache.flink.api.common.state.v2.ValueStateDescriptor<TaskInfo> 
descr =
+                new 
org.apache.flink.api.common.state.v2.ValueStateDescriptor<>(
                         "name", TypeInformation.of(TaskInfo.class));
         context.getValueState(descr);
 
-        org.apache.flink.runtime.state.v2.ValueStateDescriptor<?> 
descrIntercepted =
-                (org.apache.flink.runtime.state.v2.ValueStateDescriptor<?>) 
descriptorCapture.get();
+        org.apache.flink.api.common.state.v2.ValueStateDescriptor<?> 
descrIntercepted =
+                (org.apache.flink.api.common.state.v2.ValueStateDescriptor<?>)
+                        descriptorCapture.get();
         TypeSerializer<?> serializer = descrIntercepted.getSerializer();
 
         // check that the Path class is really registered, i.e., the execution 
config was applied
@@ -281,13 +282,14 @@ class StreamingRuntimeContextTest {
         final AtomicReference<Object> descriptorCapture = new 
AtomicReference<>();
 
         StreamingRuntimeContext context = 
createRuntimeContext(descriptorCapture, config, true);
-        org.apache.flink.runtime.state.v2.ListStateDescriptor<TaskInfo> descr =
-                new org.apache.flink.runtime.state.v2.ListStateDescriptor<>(
+        org.apache.flink.api.common.state.v2.ListStateDescriptor<TaskInfo> 
descr =
+                new org.apache.flink.api.common.state.v2.ListStateDescriptor<>(
                         "name", TypeInformation.of(TaskInfo.class));
         context.getListState(descr);
 
-        org.apache.flink.runtime.state.v2.ListStateDescriptor<?> 
descrIntercepted =
-                (org.apache.flink.runtime.state.v2.ListStateDescriptor<?>) 
descriptorCapture.get();
+        org.apache.flink.api.common.state.v2.ListStateDescriptor<?> 
descrIntercepted =
+                (org.apache.flink.api.common.state.v2.ListStateDescriptor<?>)
+                        descriptorCapture.get();
         TypeSerializer<?> serializer = descrIntercepted.getSerializer();
 
         // check that the Path class is really registered, i.e., the execution 
config was applied
@@ -305,15 +307,15 @@ class StreamingRuntimeContextTest {
         final AtomicReference<Object> descriptorCapture = new 
AtomicReference<>();
 
         StreamingRuntimeContext context = 
createRuntimeContext(descriptorCapture, config, true);
-        org.apache.flink.runtime.state.v2.MapStateDescriptor<String, TaskInfo> 
descr =
-                new org.apache.flink.runtime.state.v2.MapStateDescriptor<>(
+        org.apache.flink.api.common.state.v2.MapStateDescriptor<String, 
TaskInfo> descr =
+                new org.apache.flink.api.common.state.v2.MapStateDescriptor<>(
                         "name",
                         TypeInformation.of(String.class),
                         TypeInformation.of(TaskInfo.class));
         context.getMapState(descr);
 
-        org.apache.flink.runtime.state.v2.MapStateDescriptor<?, ?> 
descrIntercepted =
-                (org.apache.flink.runtime.state.v2.MapStateDescriptor<?, ?>)
+        org.apache.flink.api.common.state.v2.MapStateDescriptor<?, ?> 
descrIntercepted =
+                (org.apache.flink.api.common.state.v2.MapStateDescriptor<?, ?>)
                         descriptorCapture.get();
         TypeSerializer<?> serializer = descrIntercepted.getSerializer();
 
@@ -336,14 +338,14 @@ class StreamingRuntimeContextTest {
         @SuppressWarnings("unchecked")
         ReduceFunction<TaskInfo> reducer = (ReduceFunction<TaskInfo>) 
mock(ReduceFunction.class);
 
-        org.apache.flink.runtime.state.v2.ReducingStateDescriptor<TaskInfo> 
descr =
-                new 
org.apache.flink.runtime.state.v2.ReducingStateDescriptor<>(
+        org.apache.flink.api.common.state.v2.ReducingStateDescriptor<TaskInfo> 
descr =
+                new 
org.apache.flink.api.common.state.v2.ReducingStateDescriptor<>(
                         "name", reducer, TypeInformation.of(TaskInfo.class));
 
         context.getReducingState(descr);
 
-        org.apache.flink.runtime.state.v2.ReducingStateDescriptor<?> 
descrIntercepted =
-                (org.apache.flink.runtime.state.v2.ReducingStateDescriptor<?>)
+        org.apache.flink.api.common.state.v2.ReducingStateDescriptor<?> 
descrIntercepted =
+                
(org.apache.flink.api.common.state.v2.ReducingStateDescriptor<?>)
                         descriptorCapture.get();
         TypeSerializer<?> serializer = descrIntercepted.getSerializer();
 
@@ -367,15 +369,15 @@ class StreamingRuntimeContextTest {
         AggregateFunction<String, TaskInfo, String> aggregate =
                 (AggregateFunction<String, TaskInfo, String>) 
mock(AggregateFunction.class);
 
-        org.apache.flink.runtime.state.v2.AggregatingStateDescriptor<String, 
TaskInfo, String>
+        
org.apache.flink.api.common.state.v2.AggregatingStateDescriptor<String, 
TaskInfo, String>
                 descr =
-                        new 
org.apache.flink.runtime.state.v2.AggregatingStateDescriptor<>(
+                        new 
org.apache.flink.api.common.state.v2.AggregatingStateDescriptor<>(
                                 "name", aggregate, 
TypeInformation.of(TaskInfo.class));
 
         context.getAggregatingState(descr);
 
-        org.apache.flink.runtime.state.v2.AggregatingStateDescriptor<?, ?, ?> 
descrIntercepted =
-                
(org.apache.flink.runtime.state.v2.AggregatingStateDescriptor<?, ?, ?>)
+        org.apache.flink.api.common.state.v2.AggregatingStateDescriptor<?, ?, 
?> descrIntercepted =
+                
(org.apache.flink.api.common.state.v2.AggregatingStateDescriptor<?, ?, ?>)
                         descriptorCapture.get();
         TypeSerializer<?> serializer = descrIntercepted.getSerializer();
 
@@ -492,7 +494,7 @@ class StreamingRuntimeContextTest {
                 .getOrCreateKeyedState(
                         any(),
                         any(TypeSerializer.class),
-                        
any(org.apache.flink.runtime.state.v2.StateDescriptor.class));
+                        
any(org.apache.flink.api.common.state.v2.StateDescriptor.class));
 
         operator.initializeState(streamTaskStateManager);
         if (!stateV2) {
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStAggregatingState.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStAggregatingState.java
index abec5d6c83c..ba05170bf20 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStAggregatingState.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStAggregatingState.java
@@ -19,6 +19,7 @@
 package org.apache.flink.state.forst;
 
 import org.apache.flink.api.common.state.v2.AggregatingState;
+import org.apache.flink.api.common.state.v2.AggregatingStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataOutputSerializer;
@@ -31,7 +32,6 @@ import 
org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.v2.AbstractAggregatingState;
-import org.apache.flink.runtime.state.v2.AggregatingStateDescriptor;
 import org.apache.flink.util.Preconditions;
 
 import org.forstdb.ColumnFamilyHandle;
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
index fce143303c1..5cc7e2f3d2b 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
@@ -19,7 +19,12 @@ package org.apache.flink.state.forst;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.v2.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.v2.ListStateDescriptor;
+import org.apache.flink.api.common.state.v2.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.v2.State;
+import org.apache.flink.api.common.state.v2.StateDescriptor;
+import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -48,12 +53,7 @@ import 
org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
 import 
org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
-import org.apache.flink.runtime.state.v2.AggregatingStateDescriptor;
-import org.apache.flink.runtime.state.v2.ListStateDescriptor;
-import org.apache.flink.runtime.state.v2.ReducingStateDescriptor;
 import 
org.apache.flink.runtime.state.v2.RegisteredKeyValueStateBackendMetaInfo;
-import org.apache.flink.runtime.state.v2.StateDescriptor;
-import org.apache.flink.runtime.state.v2.ValueStateDescriptor;
 import org.apache.flink.runtime.state.v2.internal.InternalKeyedState;
 import org.apache.flink.runtime.state.v2.ttl.TtlStateFactory;
 import org.apache.flink.state.forst.snapshot.ForStSnapshotStrategyBase;
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListState.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListState.java
index f3bfc29cb17..e906ad0ffb4 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListState.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListState.java
@@ -19,6 +19,7 @@
 package org.apache.flink.state.forst;
 
 import org.apache.flink.api.common.state.v2.ListState;
+import org.apache.flink.api.common.state.v2.ListStateDescriptor;
 import org.apache.flink.api.common.state.v2.StateFuture;
 import org.apache.flink.api.common.state.v2.StateIterator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -35,7 +36,6 @@ import 
org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.v2.AbstractListState;
-import org.apache.flink.runtime.state.v2.ListStateDescriptor;
 
 import org.forstdb.ColumnFamilyHandle;
 
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMapState.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMapState.java
index 1c7721ddb66..af96d8e9d5c 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMapState.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMapState.java
@@ -18,7 +18,9 @@
 package org.apache.flink.state.forst;
 
 import org.apache.flink.api.common.state.v2.MapState;
+import org.apache.flink.api.common.state.v2.MapStateDescriptor;
 import org.apache.flink.api.common.state.v2.State;
+import org.apache.flink.api.common.state.v2.StateDescriptor;
 import org.apache.flink.api.common.state.v2.StateIterator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -31,8 +33,6 @@ import 
org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
 import org.apache.flink.runtime.asyncprocessing.StateRequestType;
 import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
 import org.apache.flink.runtime.state.v2.AbstractMapState;
-import org.apache.flink.runtime.state.v2.MapStateDescriptor;
-import org.apache.flink.runtime.state.v2.StateDescriptor;
 import org.apache.flink.util.Preconditions;
 
 import org.forstdb.ColumnFamilyHandle;
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStReducingState.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStReducingState.java
index da6e1dfbb7c..2f32fc3577d 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStReducingState.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStReducingState.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.state.forst;
 
+import org.apache.flink.api.common.state.v2.ReducingStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataOutputSerializer;
@@ -30,7 +31,6 @@ import 
org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.v2.AbstractReducingState;
-import org.apache.flink.runtime.state.v2.ReducingStateDescriptor;
 import org.apache.flink.util.Preconditions;
 
 import org.forstdb.ColumnFamilyHandle;
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java
index a3d2c41d0b1..8a6a4dc0c08 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java
@@ -19,6 +19,7 @@
 package org.apache.flink.state.forst;
 
 import org.apache.flink.api.common.state.v2.ValueState;
+import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataOutputSerializer;
@@ -31,7 +32,6 @@ import 
org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.v2.AbstractValueState;
-import org.apache.flink.runtime.state.v2.ValueStateDescriptor;
 import org.apache.flink.util.Preconditions;
 
 import org.forstdb.ColumnFamilyHandle;
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java
index c11521a5ed0..5db0c25aa09 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java
@@ -19,8 +19,12 @@
 package org.apache.flink.state.forst;
 
 import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.state.v2.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.v2.ListStateDescriptor;
+import org.apache.flink.api.common.state.v2.MapStateDescriptor;
 import org.apache.flink.api.common.state.v2.State;
 import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -36,10 +40,6 @@ import 
org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.runtime.state.v2.AggregatingStateDescriptor;
-import org.apache.flink.runtime.state.v2.ListStateDescriptor;
-import org.apache.flink.runtime.state.v2.MapStateDescriptor;
-import org.apache.flink.runtime.state.v2.ValueStateDescriptor;
 import org.apache.flink.runtime.state.v2.internal.InternalPartitionedState;
 import org.apache.flink.util.function.BiFunctionWithException;
 import org.apache.flink.util.function.FunctionWithException;
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStListStateTest.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStListStateTest.java
index c9adc53a263..1da3a146f31 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStListStateTest.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStListStateTest.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.state.forst;
 
+import org.apache.flink.api.common.state.v2.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.runtime.state.v2.ListStateDescriptor;
 import org.apache.flink.runtime.state.v2.internal.InternalListState;
 
 import org.junit.jupiter.api.Test;
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategyTest.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategyTest.java
index dc333dd4e25..7d330a53add 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategyTest.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategyTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.state.forst.snapshot;
 
+import org.apache.flink.api.common.state.v2.StateDescriptor;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
@@ -28,7 +29,6 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.SnapshotResources;
 import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory;
 import 
org.apache.flink.runtime.state.v2.RegisteredKeyValueStateBackendMetaInfo;
-import org.apache.flink.runtime.state.v2.StateDescriptor;
 import org.apache.flink.state.forst.ForStExtension;
 import org.apache.flink.state.forst.ForStOperationUtils;
 import org.apache.flink.state.forst.ForStStateDataTransfer;
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/MockOperatorStateStore.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/MockOperatorStateStore.java
index 14752d53a43..4c6a2e82968 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/MockOperatorStateStore.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/MockOperatorStateStore.java
@@ -70,14 +70,14 @@ public class MockOperatorStateStore
 
     @Override
     public <K, V> BroadcastState<K, V> getBroadcastState(
-            org.apache.flink.runtime.state.v2.MapStateDescriptor<K, V> 
stateDescriptor)
+            org.apache.flink.api.common.state.v2.MapStateDescriptor<K, V> 
stateDescriptor)
             throws Exception {
         return 
getBroadcastState(StateDescriptorUtils.transformFromV2ToV1(stateDescriptor));
     }
 
     @Override
     public <S> org.apache.flink.api.common.state.v2.ListState<S> getListState(
-            org.apache.flink.runtime.state.v2.ListStateDescriptor<S> 
stateDescriptor)
+            org.apache.flink.api.common.state.v2.ListStateDescriptor<S> 
stateDescriptor)
             throws Exception {
         return new OperatorListStateAdaptor<>(
                 
getListState(StateDescriptorUtils.transformFromV2ToV1(stateDescriptor)));
@@ -85,7 +85,7 @@ public class MockOperatorStateStore
 
     @Override
     public <S> org.apache.flink.api.common.state.v2.ListState<S> 
getUnionListState(
-            org.apache.flink.runtime.state.v2.ListStateDescriptor<S> 
stateDescriptor)
+            org.apache.flink.api.common.state.v2.ListStateDescriptor<S> 
stateDescriptor)
             throws Exception {
         return new OperatorListStateAdaptor<>(
                 
getListState(StateDescriptorUtils.transformFromV2ToV1(stateDescriptor)));
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/asyncprocessing/state/JoinRecordAsyncStateViews.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/asyncprocessing/state/JoinRecordAsyncStateViews.java
index 1d690884855..31a4a3447d0 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/asyncprocessing/state/JoinRecordAsyncStateViews.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/asyncprocessing/state/JoinRecordAsyncStateViews.java
@@ -20,13 +20,13 @@ package 
org.apache.flink.table.runtime.operators.join.stream.asyncprocessing.sta
 
 import org.apache.flink.api.common.state.StateTtlConfig;
 import org.apache.flink.api.common.state.v2.MapState;
+import org.apache.flink.api.common.state.v2.MapStateDescriptor;
 import org.apache.flink.api.common.state.v2.StateFuture;
 import org.apache.flink.api.common.state.v2.ValueState;
+import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.core.state.StateFutureUtils;
-import org.apache.flink.runtime.state.v2.MapStateDescriptor;
-import org.apache.flink.runtime.state.v2.ValueStateDescriptor;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.table.data.RowData;
 import 
org.apache.flink.table.runtime.operators.join.stream.utils.JoinInputSideSpec;
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/asyncprocessing/state/OuterJoinRecordAsyncStateViews.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/asyncprocessing/state/OuterJoinRecordAsyncStateViews.java
index 013304a1bbc..63c19fd1dd6 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/asyncprocessing/state/OuterJoinRecordAsyncStateViews.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/asyncprocessing/state/OuterJoinRecordAsyncStateViews.java
@@ -20,15 +20,15 @@ package 
org.apache.flink.table.runtime.operators.join.stream.asyncprocessing.sta
 
 import org.apache.flink.api.common.state.StateTtlConfig;
 import org.apache.flink.api.common.state.v2.MapState;
+import org.apache.flink.api.common.state.v2.MapStateDescriptor;
 import org.apache.flink.api.common.state.v2.StateFuture;
 import org.apache.flink.api.common.state.v2.ValueState;
+import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.core.state.StateFutureUtils;
-import org.apache.flink.runtime.state.v2.MapStateDescriptor;
-import org.apache.flink.runtime.state.v2.ValueStateDescriptor;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.table.data.RowData;
 import 
org.apache.flink.table.runtime.operators.join.stream.utils.JoinInputSideSpec;


Reply via email to