This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7cf378347fceb222e9175abe8f51742332063bf1
Author: Roman Khachatryan <[email protected]>
AuthorDate: Mon Jun 21 18:08:53 2021 +0200

    [hotfix][state/changelog] Rename StateChangelogHandle to 
ChangelogStateHandle
---
 .../metadata/MetadataV2V3SerializerBase.java       | 22 +++++++++++-----------
 .../changelog/ChangelogStateBackendHandle.java     | 12 ++++++------
 ...ngelogHandle.java => ChangelogStateHandle.java} |  2 +-
 ...pl.java => ChangelogStateHandleStreamImpl.java} | 10 +++++-----
 .../changelog/StateChangelogHandleReader.java      |  4 ++--
 .../StateChangelogHandleStreamHandleReader.java    |  8 ++++----
 .../state/changelog/StateChangelogStorage.java     |  2 +-
 .../state/changelog/StateChangelogWriter.java      |  2 +-
 ...ndle.java => InMemoryChangelogStateHandle.java} | 10 +++++-----
 .../inmemory/InMemoryStateChangelogStorage.java    |  4 ++--
 .../inmemory/InMemoryStateChangelogWriter.java     |  6 +++---
 .../inmemory/StateChangelogStorageTest.java        |  4 ++--
 .../changelog/ChangelogKeyedStateBackend.java      | 12 ++++++------
 .../restore/ChangelogBackendRestoreOperation.java  |  8 ++++----
 14 files changed, 53 insertions(+), 53 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
index 1be1fa5..b772232 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
@@ -39,10 +39,10 @@ import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
+import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
+import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl;
 import org.apache.flink.runtime.state.changelog.StateChange;
-import org.apache.flink.runtime.state.changelog.StateChangelogHandle;
-import org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamImpl;
-import 
org.apache.flink.runtime.state.changelog.inmemory.InMemoryStateChangelogHandle;
+import 
org.apache.flink.runtime.state.changelog.inmemory.InMemoryChangelogStateHandle;
 import 
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.runtime.state.filesystem.RelativeFileStateHandle;
@@ -337,8 +337,8 @@ public abstract class MetadataV2V3SerializerBase {
                 serializeKeyedStateHandle(k, dos);
             }
 
-        } else if (stateHandle instanceof InMemoryStateChangelogHandle) {
-            InMemoryStateChangelogHandle handle = 
(InMemoryStateChangelogHandle) stateHandle;
+        } else if (stateHandle instanceof InMemoryChangelogStateHandle) {
+            InMemoryChangelogStateHandle handle = 
(InMemoryChangelogStateHandle) stateHandle;
             dos.writeByte(CHANGELOG_BYTE_INCREMENT_HANDLE);
             dos.writeInt(handle.getKeyGroupRange().getStartKeyGroup());
             dos.writeInt(handle.getKeyGroupRange().getNumberOfKeyGroups());
@@ -351,8 +351,8 @@ public abstract class MetadataV2V3SerializerBase {
                 dos.write(change.getChange());
             }
 
-        } else if (stateHandle instanceof StateChangelogHandleStreamImpl) {
-            StateChangelogHandleStreamImpl handle = 
(StateChangelogHandleStreamImpl) stateHandle;
+        } else if (stateHandle instanceof ChangelogStateHandleStreamImpl) {
+            ChangelogStateHandleStreamImpl handle = 
(ChangelogStateHandleStreamImpl) stateHandle;
             dos.writeByte(CHANGELOG_FILE_INCREMENT_HANDLE);
             dos.writeInt(handle.getKeyGroupRange().getStartKeyGroup());
             dos.writeInt(handle.getKeyGroupRange().getNumberOfKeyGroups());
@@ -438,9 +438,9 @@ public abstract class MetadataV2V3SerializerBase {
                 base.add(deserializeKeyedStateHandle(dis, context));
             }
             int deltaSize = dis.readInt();
-            List<StateChangelogHandle> delta = new ArrayList<>(deltaSize);
+            List<ChangelogStateHandle> delta = new ArrayList<>(deltaSize);
             for (int i = 0; i < deltaSize; i++) {
-                delta.add((StateChangelogHandle) 
deserializeKeyedStateHandle(dis, context));
+                delta.add((ChangelogStateHandle) 
deserializeKeyedStateHandle(dis, context));
             }
             return new 
ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl(
                     base, delta, keyGroupRange);
@@ -460,7 +460,7 @@ public abstract class MetadataV2V3SerializerBase {
                 checkState(bytesSize == dis.read(bytes));
                 changes.add(new StateChange(keyGroup, bytes));
             }
-            return new InMemoryStateChangelogHandle(changes, from, to, 
keyGroupRange);
+            return new InMemoryChangelogStateHandle(changes, from, to, 
keyGroupRange);
 
         } else if (CHANGELOG_FILE_INCREMENT_HANDLE == type) {
             int start = dis.readInt();
@@ -475,7 +475,7 @@ public abstract class MetadataV2V3SerializerBase {
                 streamHandleAndOffset.add(Tuple2.of(h, o));
             }
             long size = dis.readLong();
-            return new StateChangelogHandleStreamImpl(streamHandleAndOffset, 
keyGroupRange, size);
+            return new ChangelogStateHandleStreamImpl(streamHandleAndOffset, 
keyGroupRange, size);
 
         } else {
             throw new IllegalStateException("Reading invalid KeyedStateHandle, 
type: " + type);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java
index 7940210..1046fa0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java
@@ -46,17 +46,17 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
 public interface ChangelogStateBackendHandle extends KeyedStateHandle {
     List<KeyedStateHandle> getMaterializedStateHandles();
 
-    List<StateChangelogHandle> getNonMaterializedStateHandles();
+    List<ChangelogStateHandle> getNonMaterializedStateHandles();
 
     class ChangelogStateBackendHandleImpl implements 
ChangelogStateBackendHandle {
         private static final long serialVersionUID = 1L;
         private final List<KeyedStateHandle> materialized;
-        private final List<StateChangelogHandle> nonMaterialized;
+        private final List<ChangelogStateHandle> nonMaterialized;
         private final KeyGroupRange keyGroupRange;
 
         public ChangelogStateBackendHandleImpl(
                 List<KeyedStateHandle> materialized,
-                List<StateChangelogHandle> nonMaterialized,
+                List<ChangelogStateHandle> nonMaterialized,
                 KeyGroupRange keyGroupRange) {
             this.materialized = unmodifiableList(materialized);
             this.nonMaterialized = unmodifiableList(nonMaterialized);
@@ -96,11 +96,11 @@ public interface ChangelogStateBackendHandle extends 
KeyedStateHandle {
                             .map(handle -> 
handle.getIntersection(keyGroupRange))
                             .filter(Objects::nonNull)
                             .collect(Collectors.toList());
-            List<StateChangelogHandle> deltaPart =
+            List<ChangelogStateHandle> deltaPart =
                     this.nonMaterialized.stream()
                             .map(
                                     handle ->
-                                            (StateChangelogHandle)
+                                            (ChangelogStateHandle)
                                                     
handle.getIntersection(keyGroupRange))
                             .filter(Objects::nonNull)
                             .collect(Collectors.toList());
@@ -119,7 +119,7 @@ public interface ChangelogStateBackendHandle extends 
KeyedStateHandle {
         }
 
         @Override
-        public List<StateChangelogHandle> getNonMaterializedStateHandles() {
+        public List<ChangelogStateHandle> getNonMaterializedStateHandles() {
             return nonMaterialized;
         }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateHandle.java
similarity index 94%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandle.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateHandle.java
index c200b28..070afeb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateHandle.java
@@ -22,4 +22,4 @@ import org.apache.flink.runtime.state.KeyedStateHandle;
 
 /** A handle to saved {@link StateChange state changes}. */
 @Internal
-public interface StateChangelogHandle extends KeyedStateHandle {}
+public interface ChangelogStateHandle extends KeyedStateHandle {}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateHandleStreamImpl.java
similarity index 94%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamImpl.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateHandleStreamImpl.java
index 24c1405..d36b8ad 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateHandleStreamImpl.java
@@ -36,9 +36,9 @@ import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
-/** {@link StateChangelogHandle} implementation based on {@link 
StreamStateHandle}. */
+/** {@link ChangelogStateHandle} implementation based on {@link 
StreamStateHandle}. */
 @Internal
-public final class StateChangelogHandleStreamImpl implements 
StateChangelogHandle {
+public final class ChangelogStateHandleStreamImpl implements 
ChangelogStateHandle {
 
     private static final long serialVersionUID = -8070326169926626355L;
 
@@ -49,7 +49,7 @@ public final class StateChangelogHandleStreamImpl implements 
StateChangelogHandl
     private transient SharedStateRegistry stateRegistry;
     private final long size;
 
-    public StateChangelogHandleStreamImpl(
+    public ChangelogStateHandleStreamImpl(
             List<Tuple2<StreamStateHandle, Long>> handlesAndOffsets,
             KeyGroupRange keyGroupRange,
             long size) {
@@ -58,7 +58,7 @@ public final class StateChangelogHandleStreamImpl implements 
StateChangelogHandl
         this.size = size;
     }
 
-    public StateChangelogHandleStreamImpl(
+    public ChangelogStateHandleStreamImpl(
             List<Tuple3<StreamStateHandle, Long, Long>> sorted, KeyGroupRange 
keyGroupRange) {
         this(
                 sorted.stream().map(t -> Tuple2.of(t.f0, 
t.f1)).collect(Collectors.toList()),
@@ -87,7 +87,7 @@ public final class StateChangelogHandleStreamImpl implements 
StateChangelogHandl
         if (offsets.getNumberOfKeyGroups() == 0) {
             return null;
         }
-        return new StateChangelogHandleStreamImpl(handlesAndOffsets, offsets, 
0L /* unknown */);
+        return new ChangelogStateHandleStreamImpl(handlesAndOffsets, offsets, 
0L /* unknown */);
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleReader.java
index 9edd7e8..7c71ac1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleReader.java
@@ -22,8 +22,8 @@ import org.apache.flink.util.CloseableIterator;
 
 import java.io.IOException;
 
-/** Allows to read state changelog referenced by the provided {@link 
StateChangelogHandle}. */
+/** Allows to read state changelog referenced by the provided {@link 
ChangelogStateHandle}. */
 @Internal
-public interface StateChangelogHandleReader<Handle extends 
StateChangelogHandle> {
+public interface StateChangelogHandleReader<Handle extends 
ChangelogStateHandle> {
     CloseableIterator<StateChange> getChanges(Handle handle) throws 
IOException;
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamHandleReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamHandleReader.java
index e586d95..900d781 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamHandleReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamHandleReader.java
@@ -30,16 +30,16 @@ import java.io.IOException;
 import java.util.Iterator;
 
 /**
- * A reader for {@link StateChangelogHandleStreamImpl} that iterates over its 
underlying {@link
+ * A reader for {@link ChangelogStateHandleStreamImpl} that iterates over its 
underlying {@link
  * StreamStateHandle stream handles} and offsets. Starting from each offset, 
it enumerates the
  * {@link StateChange state changes} using the provided {@link 
StateChangeIterator}. Different
  * {@link StateChangelogStorage} implementations may have different 
<b>iterator</b> implementations.
- * Using a different {@link StateChangelogHandle} (and reader) is problematic 
as it needs to be
+ * Using a different {@link ChangelogStateHandle} (and reader) is problematic 
as it needs to be
  * serialized.
  */
 @Internal
 public class StateChangelogHandleStreamHandleReader
-        implements StateChangelogHandleReader<StateChangelogHandleStreamImpl> {
+        implements StateChangelogHandleReader<ChangelogStateHandleStreamImpl> {
     private static final Logger LOG =
             
LoggerFactory.getLogger(StateChangelogHandleStreamHandleReader.class);
 
@@ -56,7 +56,7 @@ public class StateChangelogHandleStreamHandleReader
     }
 
     @Override
-    public CloseableIterator<StateChange> 
getChanges(StateChangelogHandleStreamImpl handle)
+    public CloseableIterator<StateChange> 
getChanges(ChangelogStateHandleStreamImpl handle)
             throws IOException {
         return new CloseableIterator<StateChange>() {
             private final Iterator<Tuple2<StreamStateHandle, Long>> 
handleIterator =
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorage.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorage.java
index 699d585..1cc08d3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorage.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorage.java
@@ -26,7 +26,7 @@ import org.apache.flink.runtime.state.KeyGroupRange;
  * {@link StateChangelogStorageLoader} to obtain an instance.
  */
 @Internal
-public interface StateChangelogStorage<Handle extends StateChangelogHandle> 
extends AutoCloseable {
+public interface StateChangelogStorage<Handle extends ChangelogStateHandle> 
extends AutoCloseable {
 
     StateChangelogWriter<Handle> createWriter(String operatorID, KeyGroupRange 
keyGroupRange);
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java
index 48cf9c9..694973d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java
@@ -24,7 +24,7 @@ import java.util.concurrent.CompletableFuture;
 
 /** Allows to write data to the log. Scoped to a single writer (e.g. state 
backend). */
 @Internal
-public interface StateChangelogWriter<Handle extends StateChangelogHandle> 
extends AutoCloseable {
+public interface StateChangelogWriter<Handle extends ChangelogStateHandle> 
extends AutoCloseable {
 
     /** Get the initial {@link SequenceNumber} that is used for the first 
element. */
     SequenceNumber initialSequenceNumber();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryChangelogStateHandle.java
similarity index 92%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryChangelogStateHandle.java
index 8735f98..7f5695b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryChangelogStateHandle.java
@@ -21,18 +21,18 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
 import org.apache.flink.runtime.state.changelog.SequenceNumber;
 import org.apache.flink.runtime.state.changelog.StateChange;
-import org.apache.flink.runtime.state.changelog.StateChangelogHandle;
 
 import javax.annotation.Nullable;
 
 import java.util.Collections;
 import java.util.List;
 
-/** In-memory {@link StateChangelogHandle}. */
+/** In-memory {@link ChangelogStateHandle}. */
 @Internal
-public class InMemoryStateChangelogHandle implements StateChangelogHandle {
+public class InMemoryChangelogStateHandle implements ChangelogStateHandle {
 
     private static final long serialVersionUID = 1L;
 
@@ -41,12 +41,12 @@ public class InMemoryStateChangelogHandle implements 
StateChangelogHandle {
     private final SequenceNumber to; // for debug purposes
     private final KeyGroupRange keyGroupRange;
 
-    public InMemoryStateChangelogHandle(
+    public InMemoryChangelogStateHandle(
             List<StateChange> changes, long from, long to, KeyGroupRange 
keyGroupRange) {
         this(changes, SequenceNumber.of(from), SequenceNumber.of(to), 
keyGroupRange);
     }
 
-    public InMemoryStateChangelogHandle(
+    public InMemoryChangelogStateHandle(
             List<StateChange> changes,
             SequenceNumber from,
             SequenceNumber to,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorage.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorage.java
index 2a751165..c9bfdfb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorage.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorage.java
@@ -24,7 +24,7 @@ import org.apache.flink.util.CloseableIterator;
 
 /** An in-memory (non-production) implementation of {@link 
StateChangelogStorage}. */
 public class InMemoryStateChangelogStorage
-        implements StateChangelogStorage<InMemoryStateChangelogHandle> {
+        implements StateChangelogStorage<InMemoryChangelogStateHandle> {
 
     @Override
     public InMemoryStateChangelogWriter createWriter(
@@ -33,7 +33,7 @@ public class InMemoryStateChangelogStorage
     }
 
     @Override
-    public StateChangelogHandleReader<InMemoryStateChangelogHandle> 
createReader() {
+    public StateChangelogHandleReader<InMemoryChangelogStateHandle> 
createReader() {
         return handle -> CloseableIterator.fromList(handle.getChanges(), 
change -> {});
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
index 3c15984..cc12045 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
@@ -42,7 +42,7 @@ import java.util.stream.Stream;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 
 @NotThreadSafe
-class InMemoryStateChangelogWriter implements 
StateChangelogWriter<InMemoryStateChangelogHandle> {
+class InMemoryStateChangelogWriter implements 
StateChangelogWriter<InMemoryChangelogStateHandle> {
     private static final Logger LOG = 
LoggerFactory.getLogger(InMemoryStateChangelogWriter.class);
     private static final SequenceNumber INITIAL_SQN = SequenceNumber.of(0L);
 
@@ -75,11 +75,11 @@ class InMemoryStateChangelogWriter implements 
StateChangelogWriter<InMemoryState
     }
 
     @Override
-    public CompletableFuture<InMemoryStateChangelogHandle> 
persist(SequenceNumber from) {
+    public CompletableFuture<InMemoryChangelogStateHandle> 
persist(SequenceNumber from) {
         LOG.debug("Persist after {}", from);
         Preconditions.checkNotNull(from);
         return completedFuture(
-                new InMemoryStateChangelogHandle(collectChanges(from), from, 
sqn, keyGroupRange));
+                new InMemoryChangelogStateHandle(collectChanges(from), from, 
sqn, keyGroupRange));
     }
 
     private List<StateChange> collectChanges(SequenceNumber after) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
index cb31c38..a0dfa8b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
@@ -19,9 +19,9 @@ package org.apache.flink.runtime.state.changelog.inmemory;
 
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
 import org.apache.flink.runtime.state.changelog.SequenceNumber;
 import org.apache.flink.runtime.state.changelog.StateChange;
-import org.apache.flink.runtime.state.changelog.StateChangelogHandle;
 import org.apache.flink.runtime.state.changelog.StateChangelogHandleReader;
 import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
 import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
@@ -49,7 +49,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 
 /** {@link InMemoryStateChangelogStorage} test. */
-public class StateChangelogStorageTest<T extends StateChangelogHandle> {
+public class StateChangelogStorageTest<T extends ChangelogStateHandle> {
 
     private final Random random = new Random();
 
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
index f7a83f0..375ced2 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
@@ -46,8 +46,8 @@ import 
org.apache.flink.runtime.state.StateSnapshotTransformer;
 import org.apache.flink.runtime.state.TestableKeyedStateBackend;
 import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
 import 
org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl;
+import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
 import org.apache.flink.runtime.state.changelog.SequenceNumber;
-import org.apache.flink.runtime.state.changelog.StateChangelogHandle;
 import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
 import org.apache.flink.runtime.state.heap.InternalKeyContext;
@@ -130,7 +130,7 @@ public class ChangelogKeyedStateBackend<K>
 
     private final TtlTimeProvider ttlTimeProvider;
 
-    private final StateChangelogWriter<StateChangelogHandle> 
stateChangelogWriter;
+    private final StateChangelogWriter<ChangelogStateHandle> 
stateChangelogWriter;
 
     private long lastCheckpointId = -1L;
 
@@ -150,7 +150,7 @@ public class ChangelogKeyedStateBackend<K>
 
     /** Updated initially on restore and later cleared upon materialization 
(in FLINK-21357). */
     @GuardedBy("materialized")
-    private final List<StateChangelogHandle> restoredNonMaterialized = new 
ArrayList<>();
+    private final List<ChangelogStateHandle> restoredNonMaterialized = new 
ArrayList<>();
 
     /**
      * {@link SequenceNumber} denoting last upload range <b>start</b>, 
inclusive. Updated to {@link
@@ -181,7 +181,7 @@ public class ChangelogKeyedStateBackend<K>
             AbstractKeyedStateBackend<K> keyedStateBackend,
             ExecutionConfig executionConfig,
             TtlTimeProvider ttlTimeProvider,
-            StateChangelogWriter<StateChangelogHandle> stateChangelogWriter,
+            StateChangelogWriter<ChangelogStateHandle> stateChangelogWriter,
             Collection<ChangelogStateBackendHandle> initialState) {
         this.keyedStateBackend = keyedStateBackend;
         this.executionConfig = executionConfig;
@@ -324,13 +324,13 @@ public class ChangelogKeyedStateBackend<K>
                         .thenApply(this::buildSnapshotResult));
     }
 
-    private SnapshotResult<KeyedStateHandle> 
buildSnapshotResult(StateChangelogHandle delta) {
+    private SnapshotResult<KeyedStateHandle> 
buildSnapshotResult(ChangelogStateHandle delta) {
         // Can be called by either task thread during the sync checkpoint 
phase (if persist future
         // was already completed); or by the writer thread otherwise. So need 
to synchronize.
         // todo: revisit after FLINK-21357 - use mailbox action?
         synchronized (materialized) {
             // collections don't change once started and handles are immutable
-            List<StateChangelogHandle> prevDeltaCopy = new 
ArrayList<>(restoredNonMaterialized);
+            List<ChangelogStateHandle> prevDeltaCopy = new 
ArrayList<>(restoredNonMaterialized);
             if (delta != null && delta.getStateSize() > 0) {
                 prevDeltaCopy.add(delta);
             }
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendRestoreOperation.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendRestoreOperation.java
index 528e0ec..4ab881c 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendRestoreOperation.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendRestoreOperation.java
@@ -21,8 +21,8 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
+import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
 import org.apache.flink.runtime.state.changelog.StateChange;
-import org.apache.flink.runtime.state.changelog.StateChangelogHandle;
 import org.apache.flink.runtime.state.changelog.StateChangelogHandleReader;
 import org.apache.flink.state.changelog.ChangelogKeyedStateBackend;
 import org.apache.flink.util.CloseableIterator;
@@ -55,7 +55,7 @@ public class ChangelogBackendRestoreOperation {
                     ChangelogKeyedStateBackend<K>,
                     Exception> {}
 
-    public static <K, T extends StateChangelogHandle> 
ChangelogKeyedStateBackend<K> restore(
+    public static <K, T extends ChangelogStateHandle> 
ChangelogKeyedStateBackend<K> restore(
             StateChangelogHandleReader<T> changelogHandleReader,
             ClassLoader classLoader,
             Collection<ChangelogStateBackendHandle> stateHandles,
@@ -76,13 +76,13 @@ public class ChangelogBackendRestoreOperation {
     }
 
     @SuppressWarnings("unchecked")
-    private static <T extends StateChangelogHandle> void readBackendHandle(
+    private static <T extends ChangelogStateHandle> void readBackendHandle(
             ChangelogKeyedStateBackend<?> backend,
             ChangelogStateBackendHandle backendHandle,
             StateChangelogHandleReader<T> changelogHandleReader,
             ClassLoader classLoader)
             throws Exception {
-        for (StateChangelogHandle changelogHandle :
+        for (ChangelogStateHandle changelogHandle :
                 backendHandle.getNonMaterializedStateHandles()) {
             try (CloseableIterator<StateChange> changes =
                     changelogHandleReader.getChanges((T) changelogHandle)) {

Reply via email to