This is an automated email from the ASF dual-hosted git repository.
hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 6bdb4f752ad [FLINK-34078][state]Move InternalKeyContext classes from
o.a.f.runtime.state.heap to o.a.f.runtime.state package
6bdb4f752ad is described below
commit 6bdb4f752adb2b43dbadd8ad4fffcb4c00568dd3
Author: Jinzhong Li <[email protected]>
AuthorDate: Mon Jan 15 18:29:56 2024 +0800
[FLINK-34078][state]Move InternalKeyContext classes from
o.a.f.runtime.state.heap to o.a.f.runtime.state package
---
.../org/apache/flink/state/api/input/MultiStateKeyIteratorTest.java | 4 ++--
.../org/apache/flink/runtime/state/AbstractKeyedStateBackend.java | 1 -
.../apache/flink/runtime/state/{heap => }/InternalKeyContext.java | 3 +--
.../flink/runtime/state/{heap => }/InternalKeyContextImpl.java | 5 +----
.../org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java | 1 +
.../org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java | 1 +
.../flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java | 2 ++
.../flink/runtime/state/heap/HeapMetaInfoRestoreOperation.java | 1 +
.../org/apache/flink/runtime/state/heap/HeapRestoreOperation.java | 1 +
.../flink/runtime/state/heap/HeapSavepointRestoreOperation.java | 1 +
.../main/java/org/apache/flink/runtime/state/heap/StateTable.java | 1 +
.../java/org/apache/flink/runtime/state/heap/StateTableFactory.java | 1 +
.../apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java | 2 ++
.../apache/flink/runtime/state/heap/InternalKeyContextImplTest.java | 1 +
.../org/apache/flink/runtime/state/heap/MockInternalKeyContext.java | 2 ++
.../apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java | 2 +-
.../flink/runtime/state/ttl/mock/MockKeyedStateBackendBuilder.java | 2 +-
.../org/apache/flink/state/changelog/AbstractStateChangeLogger.java | 2 +-
.../org/apache/flink/state/changelog/ChangelogAggregatingState.java | 2 +-
.../java/org/apache/flink/state/changelog/ChangelogListState.java | 2 +-
.../java/org/apache/flink/state/changelog/ChangelogMapState.java | 2 +-
.../org/apache/flink/state/changelog/ChangelogReducingState.java | 2 +-
.../java/org/apache/flink/state/changelog/ChangelogStateFactory.java | 2 +-
.../java/org/apache/flink/state/changelog/ChangelogValueState.java | 2 +-
.../org/apache/flink/state/changelog/KvStateChangeLoggerImpl.java | 2 +-
.../flink/state/changelog/PriorityQueueStateChangeLoggerImpl.java | 2 +-
.../flink/state/changelog/restore/AggregatingStateChangeApplier.java | 2 +-
.../flink/state/changelog/restore/ChangelogApplierFactory.java | 2 +-
.../flink/state/changelog/restore/ChangelogApplierFactoryImpl.java | 2 +-
.../apache/flink/state/changelog/restore/KvStateChangeApplier.java | 2 +-
.../apache/flink/state/changelog/restore/ListStateChangeApplier.java | 2 +-
.../apache/flink/state/changelog/restore/MapStateChangeApplier.java | 2 +-
.../flink/state/changelog/restore/ReducingStateChangeApplier.java | 2 +-
.../flink/state/changelog/restore/ValueStateChangeApplier.java | 2 +-
.../org/apache/flink/state/changelog/ChangelogListStateTest.java | 2 +-
.../java/org/apache/flink/state/changelog/ChangelogMapStateTest.java | 2 +-
.../apache/flink/state/changelog/KvStateChangeLoggerImplTest.java | 2 +-
.../state/changelog/PriorityQueueStateChangeLoggerImplTest.java | 2 +-
.../org/apache/flink/state/changelog/StateChangeLoggerTestBase.java | 2 +-
.../flink/contrib/streaming/state/RocksDBKeyedStateBackend.java | 2 +-
.../contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java | 4 ++--
.../org/apache/flink/streaming/runtime/tasks/TestStateBackend.java | 4 ++--
42 files changed, 47 insertions(+), 38 deletions(-)
diff --git
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/MultiStateKeyIteratorTest.java
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/MultiStateKeyIteratorTest.java
index 41db35aefd6..334bd3b75fe 100644
---
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/MultiStateKeyIteratorTest.java
+++
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/MultiStateKeyIteratorTest.java
@@ -36,6 +36,8 @@ import
org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.InternalKeyContext;
+import org.apache.flink.runtime.state.InternalKeyContextImpl;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.Keyed;
@@ -49,8 +51,6 @@ import
org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
-import org.apache.flink.runtime.state.heap.InternalKeyContext;
-import org.apache.flink.runtime.state.heap.InternalKeyContextImpl;
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.state.ttl.mock.MockRestoreOperation;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index f9c26725093..6225c1dac12 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -28,7 +28,6 @@ import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.heap.InternalKeyContext;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateFactory;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContext.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalKeyContext.java
similarity index 95%
rename from
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContext.java
rename to
flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalKeyContext.java
index 667f65fbd60..b4155f3b47c 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContext.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalKeyContext.java
@@ -16,10 +16,9 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.state.heap;
+package org.apache.flink.runtime.state;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.state.KeyGroupRange;
import javax.annotation.Nonnull;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContextImpl.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalKeyContextImpl.java
similarity index 93%
rename from
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContextImpl.java
rename to
flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalKeyContextImpl.java
index 67e794ac959..6c0977448bb 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContextImpl.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalKeyContextImpl.java
@@ -16,10 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.state.heap;
-
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
+package org.apache.flink.runtime.state;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
index 7227bf31fd6..787b4031ba0 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.state.heap;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.InternalKeyContext;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import javax.annotation.Nonnull;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 7edcfe45db5..daabc9302c8 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.HeapPriorityQueuesManager;
+import org.apache.flink.runtime.state.InternalKeyContext;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.Keyed;
import org.apache.flink.runtime.state.KeyedStateFunction;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java
index 6db0f7e4ad5..0406f1bd60d 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java
@@ -24,6 +24,8 @@ import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackendBuilder;
import org.apache.flink.runtime.state.BackendBuildingException;
+import org.apache.flink.runtime.state.InternalKeyContext;
+import org.apache.flink.runtime.state.InternalKeyContextImpl;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMetaInfoRestoreOperation.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMetaInfoRestoreOperation.java
index 8badfd28c70..b9bfee0e9a2 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMetaInfoRestoreOperation.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMetaInfoRestoreOperation.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.state.heap;
+import org.apache.flink.runtime.state.InternalKeyContext;
import org.apache.flink.runtime.state.KeyExtractorFunction;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.Keyed;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java
index 89fe3e88281..fcf0777452c 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.runtime.state.InternalKeyContext;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSavepointRestoreOperation.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSavepointRestoreOperation.java
index d52b7093556..984b4de31bd 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSavepointRestoreOperation.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSavepointRestoreOperation.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.runtime.state.InternalKeyContext;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.ListDelimitedSerializer;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
index 9120da41581..af48a7f4e08 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.state.heap;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.InternalKeyContext;
import org.apache.flink.runtime.state.IterableStateSnapshot;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableFactory.java
index c6bed4997fd..617d716e1a6 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableFactory.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.state.heap;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.InternalKeyContext;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
/**
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
index 1c53e309c3d..537c0edd37f 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
@@ -24,6 +24,8 @@ import
org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.InternalKeyContext;
+import org.apache.flink.runtime.state.InternalKeyContextImpl;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.StateSnapshot;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/InternalKeyContextImplTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/InternalKeyContextImplTest.java
index 77f718573bd..bade8448f4e 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/InternalKeyContextImplTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/InternalKeyContextImplTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.flink.runtime.state.heap;
+import org.apache.flink.runtime.state.InternalKeyContextImpl;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.junit.jupiter.api.Test;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/MockInternalKeyContext.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/MockInternalKeyContext.java
index de1d7e4efe2..4f9fd6e368a 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/MockInternalKeyContext.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/MockInternalKeyContext.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.state.heap;
+import org.apache.flink.runtime.state.InternalKeyContext;
+import org.apache.flink.runtime.state.InternalKeyContextImpl;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
index b8d5bc1d130..807356afa32 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.InternalKeyContext;
import org.apache.flink.runtime.state.KeyExtractorFunction;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
@@ -46,7 +47,6 @@ import
org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTran
import org.apache.flink.runtime.state.StateSnapshotTransformers;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSet;
-import org.apache.flink.runtime.state.heap.InternalKeyContext;
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
import org.apache.flink.runtime.state.ttl.TtlStateFactory;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackendBuilder.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackendBuilder.java
index 47383ed9760..14f36ecfd2e 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackendBuilder.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackendBuilder.java
@@ -23,11 +23,11 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackendBuilder;
+import org.apache.flink.runtime.state.InternalKeyContextImpl;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
-import org.apache.flink.runtime.state.heap.InternalKeyContextImpl;
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import
org.apache.flink.runtime.state.ttl.mock.MockKeyedStateBackend.MockSnapshotSupplier;
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java
index fa1af755752..0f626135429 100644
---
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java
+++
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java
@@ -19,11 +19,11 @@ package org.apache.flink.state.changelog;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.InternalKeyContext;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import
org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
-import org.apache.flink.runtime.state.heap.InternalKeyContext;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import
org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters;
import org.apache.flink.util.function.ThrowingConsumer;
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogAggregatingState.java
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogAggregatingState.java
index 2fa97124108..780f49be291 100644
---
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogAggregatingState.java
+++
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogAggregatingState.java
@@ -20,8 +20,8 @@ package org.apache.flink.state.changelog;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.State;
+import org.apache.flink.runtime.state.InternalKeyContext;
import org.apache.flink.runtime.state.changelog.StateChange;
-import org.apache.flink.runtime.state.heap.InternalKeyContext;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.state.changelog.restore.ChangelogApplierFactory;
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogListState.java
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogListState.java
index 9d077971ba7..d1df87c8765 100644
---
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogListState.java
+++
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogListState.java
@@ -21,8 +21,8 @@ package org.apache.flink.state.changelog;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.runtime.state.InternalKeyContext;
import org.apache.flink.runtime.state.changelog.StateChange;
-import org.apache.flink.runtime.state.heap.InternalKeyContext;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.state.changelog.restore.ChangelogApplierFactory;
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java
index f2829655203..0beebacc9e3 100644
---
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java
+++
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java
@@ -22,8 +22,8 @@ import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.InternalKeyContext;
import org.apache.flink.runtime.state.changelog.StateChange;
-import org.apache.flink.runtime.state.heap.InternalKeyContext;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.state.changelog.restore.ChangelogApplierFactory;
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogReducingState.java
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogReducingState.java
index 068754ddcf5..c4ff3ec6f44 100644
---
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogReducingState.java
+++
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogReducingState.java
@@ -20,8 +20,8 @@ package org.apache.flink.state.changelog;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.State;
+import org.apache.flink.runtime.state.InternalKeyContext;
import org.apache.flink.runtime.state.changelog.StateChange;
-import org.apache.flink.runtime.state.heap.InternalKeyContext;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.internal.InternalReducingState;
import org.apache.flink.state.changelog.restore.ChangelogApplierFactory;
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateFactory.java
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateFactory.java
index 52aaa7fad3f..aa0ee677732 100644
---
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateFactory.java
+++
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateFactory.java
@@ -21,8 +21,8 @@ import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.InternalKeyContext;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
-import org.apache.flink.runtime.state.heap.InternalKeyContext;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.util.FlinkRuntimeException;
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogValueState.java
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogValueState.java
index d37026b7128..84caa7b3aa3 100644
---
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogValueState.java
+++
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogValueState.java
@@ -20,8 +20,8 @@ package org.apache.flink.state.changelog;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.runtime.state.InternalKeyContext;
import org.apache.flink.runtime.state.changelog.StateChange;
-import org.apache.flink.runtime.state.heap.InternalKeyContext;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.state.changelog.restore.ChangelogApplierFactory;
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/KvStateChangeLoggerImpl.java
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/KvStateChangeLoggerImpl.java
index cf346295969..04a8b659fed 100644
---
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/KvStateChangeLoggerImpl.java
+++
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/KvStateChangeLoggerImpl.java
@@ -21,10 +21,10 @@ import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.InternalKeyContext;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
-import org.apache.flink.runtime.state.heap.InternalKeyContext;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImpl.java
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImpl.java
index 0f348b7c1d8..b1785e32822 100644
---
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImpl.java
+++
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImpl.java
@@ -19,10 +19,10 @@ package org.apache.flink.state.changelog;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.InternalKeyContext;
import
org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
-import org.apache.flink.runtime.state.heap.InternalKeyContext;
import java.io.IOException;
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/AggregatingStateChangeApplier.java
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/AggregatingStateChangeApplier.java
index aabe8cddc76..d990cb507a1 100644
---
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/AggregatingStateChangeApplier.java
+++
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/AggregatingStateChangeApplier.java
@@ -18,7 +18,7 @@
package org.apache.flink.state.changelog.restore;
import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.runtime.state.heap.InternalKeyContext;
+import org.apache.flink.runtime.state.InternalKeyContext;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.state.changelog.StateChangeOperation;
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogApplierFactory.java
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogApplierFactory.java
index 7c6c0598de9..12a864bbdc8 100644
---
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogApplierFactory.java
+++
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogApplierFactory.java
@@ -19,8 +19,8 @@ package org.apache.flink.state.changelog.restore;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.InternalKeyContext;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
-import org.apache.flink.runtime.state.heap.InternalKeyContext;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalMapState;
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogApplierFactoryImpl.java
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogApplierFactoryImpl.java
index b11df22bfb2..7c4476e3673 100644
---
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogApplierFactoryImpl.java
+++
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogApplierFactoryImpl.java
@@ -18,8 +18,8 @@
package org.apache.flink.state.changelog.restore;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.InternalKeyContext;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
-import org.apache.flink.runtime.state.heap.InternalKeyContext;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalMapState;
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/KvStateChangeApplier.java
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/KvStateChangeApplier.java
index 0fe8b995f3c..19248f179c9 100644
---
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/KvStateChangeApplier.java
+++
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/KvStateChangeApplier.java
@@ -18,8 +18,8 @@
package org.apache.flink.state.changelog.restore;
import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.state.InternalKeyContext;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
-import org.apache.flink.runtime.state.heap.InternalKeyContext;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.internal.InternalMergingState;
import org.apache.flink.state.changelog.StateChangeOperation;
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ListStateChangeApplier.java
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ListStateChangeApplier.java
index 1f7b05e127b..aaac5341f3e 100644
---
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ListStateChangeApplier.java
+++
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ListStateChangeApplier.java
@@ -19,7 +19,7 @@ package org.apache.flink.state.changelog.restore;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.runtime.state.heap.InternalKeyContext;
+import org.apache.flink.runtime.state.InternalKeyContext;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.state.changelog.StateChangeOperation;
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/MapStateChangeApplier.java
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/MapStateChangeApplier.java
index b498329c5ba..6eb64213488 100644
---
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/MapStateChangeApplier.java
+++
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/MapStateChangeApplier.java
@@ -19,7 +19,7 @@ package org.apache.flink.state.changelog.restore;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.runtime.state.heap.InternalKeyContext;
+import org.apache.flink.runtime.state.InternalKeyContext;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.state.changelog.StateChangeOperation;
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ReducingStateChangeApplier.java
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ReducingStateChangeApplier.java
index 80aeaf9b383..9bd94238e65 100644
---
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ReducingStateChangeApplier.java
+++
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ReducingStateChangeApplier.java
@@ -18,7 +18,7 @@
package org.apache.flink.state.changelog.restore;
import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.runtime.state.heap.InternalKeyContext;
+import org.apache.flink.runtime.state.InternalKeyContext;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.internal.InternalReducingState;
import org.apache.flink.state.changelog.StateChangeOperation;
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ValueStateChangeApplier.java
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ValueStateChangeApplier.java
index c32db7d906a..8525451b1bf 100644
---
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ValueStateChangeApplier.java
+++
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ValueStateChangeApplier.java
@@ -18,7 +18,7 @@
package org.apache.flink.state.changelog.restore;
import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.runtime.state.heap.InternalKeyContext;
+import org.apache.flink.runtime.state.InternalKeyContext;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.state.changelog.StateChangeOperation;
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogListStateTest.java
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogListStateTest.java
index 9a7420b80e4..16f58c48fdc 100644
---
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogListStateTest.java
+++
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogListStateTest.java
@@ -21,8 +21,8 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.runtime.state.InternalKeyContextImpl;
import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.heap.InternalKeyContextImpl;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.ThrowingConsumer;
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogMapStateTest.java
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogMapStateTest.java
index f28befc060f..058e96ad3f1 100644
---
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogMapStateTest.java
+++
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogMapStateTest.java
@@ -21,8 +21,8 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
+import org.apache.flink.runtime.state.InternalKeyContextImpl;
import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.heap.InternalKeyContextImpl;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.ThrowingConsumer;
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/KvStateChangeLoggerImplTest.java
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/KvStateChangeLoggerImplTest.java
index 50a60707465..523401c5569 100644
---
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/KvStateChangeLoggerImplTest.java
+++
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/KvStateChangeLoggerImplTest.java
@@ -20,8 +20,8 @@ package org.apache.flink.state.changelog;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.InternalKeyContextImpl;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
-import org.apache.flink.runtime.state.heap.InternalKeyContextImpl;
import java.io.IOException;
import java.util.Collections;
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImplTest.java
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImplTest.java
index d41b447bd11..ebb0a509561 100644
---
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImplTest.java
+++
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImplTest.java
@@ -18,8 +18,8 @@
package org.apache.flink.state.changelog;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.state.InternalKeyContextImpl;
import
org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
-import org.apache.flink.runtime.state.heap.InternalKeyContextImpl;
/** {@link PriorityQueueStateChangeLoggerImpl} test. */
public class PriorityQueueStateChangeLoggerImplTest extends
StateChangeLoggerTestBase<Void> {
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
index bd57c8a3680..38efdebb255 100644
---
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
+++
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
@@ -18,10 +18,10 @@
package org.apache.flink.state.changelog;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.InternalKeyContextImpl;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.changelog.SequenceNumber;
import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
-import org.apache.flink.runtime.state.heap.InternalKeyContextImpl;
import org.junit.Test;
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 9766ebde385..c91b63963f6 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -42,6 +42,7 @@ import
org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
import org.apache.flink.runtime.state.HeapPriorityQueuesManager;
+import org.apache.flink.runtime.state.InternalKeyContext;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.Keyed;
import org.apache.flink.runtime.state.KeyedStateHandle;
@@ -58,7 +59,6 @@ import
org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import
org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper;
-import org.apache.flink.runtime.state.heap.InternalKeyContext;
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.FileUtils;
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index 6d2ca01344d..f33ce8f0fba 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -39,6 +39,8 @@ import
org.apache.flink.runtime.state.BackendBuildingException;
import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import
org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath;
+import org.apache.flink.runtime.state.InternalKeyContext;
+import org.apache.flink.runtime.state.InternalKeyContextImpl;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
@@ -48,8 +50,6 @@ import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import
org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper;
-import org.apache.flink.runtime.state.heap.InternalKeyContext;
-import org.apache.flink.runtime.state.heap.InternalKeyContextImpl;
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.FileUtils;
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestStateBackend.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestStateBackend.java
index 20da063f10c..ab2a0629fcc 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestStateBackend.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestStateBackend.java
@@ -32,6 +32,8 @@ import
org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
+import org.apache.flink.runtime.state.InternalKeyContext;
+import org.apache.flink.runtime.state.InternalKeyContextImpl;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.Keyed;
import org.apache.flink.runtime.state.KeyedStateHandle;
@@ -46,8 +48,6 @@ import org.apache.flink.runtime.state.SnapshotStrategy;
import org.apache.flink.runtime.state.SnapshotStrategyRunner;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
-import org.apache.flink.runtime.state.heap.InternalKeyContext;
-import org.apache.flink.runtime.state.heap.InternalKeyContextImpl;
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;