This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c23a3002b83 [FLINK-30863][state/changelog] Register local recovery 
files of changelog before notifyCheckpointComplete()
c23a3002b83 is described below

commit c23a3002b837ecffa2d561a405c17c979c814c6e
Author: Yanfei Lei <[email protected]>
AuthorDate: Tue Aug 29 16:37:16 2023 +0800

    [FLINK-30863][state/changelog] Register local recovery files of changelog 
before notifyCheckpointComplete()
    
    This closes https://github.com/apache/flink/pull/21822
---
 .../fs/DuplicatingStateChangeFsUploader.java       |  12 +-
 .../changelog/fs/FsStateChangelogStorage.java      |   2 +
 .../flink/changelog/fs/FsStateChangelogWriter.java |  48 ++--
 .../changelog/fs/ChangelogStorageMetricsTest.java  |  16 +-
 .../fs/DiscardRecordableStateChangeUploader.java   |   3 +-
 .../fs/FsStateChangelogWriterSqnTest.java          |   2 +-
 .../changelog/fs/FsStateChangelogWriterTest.java   | 263 +++++++++++++++++++--
 .../state/changelog/LocalChangelogRegistry.java    |  15 +-
 .../changelog/LocalChangelogRegistryImpl.java      |  26 +-
 .../state/changelog/StateChangelogWriter.java      |  11 +-
 .../inmemory/InMemoryStateChangelogWriter.java     |   5 +-
 .../runtime/state/TestLocalRecoveryConfig.java     |   4 +
 .../changelog/LocalChangelogRegistryTest.java      |   2 +-
 .../inmemory/StateChangelogStorageTest.java        |   2 +-
 .../changelog/ChangelogKeyedStateBackend.java      |   2 +-
 .../state/changelog/ChangelogTruncateHelper.java   |   1 -
 .../state/changelog/StateChangeLoggerTestBase.java |   6 +-
 17 files changed, 319 insertions(+), 101 deletions(-)

diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java
index cf99ed17683..50db749b973 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java
@@ -26,6 +26,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.ChangelogTaskLocalStateStore;
 import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
 import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.changelog.LocalChangelogRegistry;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 
 import org.slf4j.Logger;
@@ -51,14 +52,13 @@ import static 
org.apache.flink.runtime.state.ChangelogTaskLocalStateStore.getLoc
  *       <li>Store the meta of files into {@link ChangelogTaskLocalStateStore} 
by
  *           AsyncCheckpointRunnable#reportCompletedSnapshotStates().
  *       <li>Pass control of the file to {@link 
LocalChangelogRegistry#register} when
- *           ChangelogKeyedStateBackend#notifyCheckpointComplete() , files of 
the previous
- *           checkpoint will be deleted by {@link 
LocalChangelogRegistry#discardUpToCheckpoint} at
- *           the same time.
+ *           FsStateChangelogWriter#persist , files of the previous checkpoint 
will be deleted by
+ *           {@link LocalChangelogRegistry#discardUpToCheckpoint} when the 
checkpoint is confirmed.
  *       <li>When ChangelogTruncateHelper#materialized() or
  *           ChangelogTruncateHelper#checkpointSubsumed() is called, {@link
- *           TaskChangelogRegistry#notUsed} is responsible for deleting local 
files.
- *       <li>When one checkpoint is aborted, the dstl files of this checkpoint 
will be deleted by
- *           {@link LocalChangelogRegistry#prune} in {@link 
FsStateChangelogWriter#reset}.
+ *           TaskChangelogRegistry#release} is responsible for deleting local 
files.
+ *       <li>When one checkpoint is aborted, all accumulated local dstl files 
will be deleted at
+ *           once.
  *     </ol>
  */
 public class DuplicatingStateChangeFsUploader extends 
AbstractStateChangeFsUploader {
diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java
index 5ba18f701ea..1d4668cb8e8 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.runtime.state.changelog.LocalChangelogRegistry;
 import org.apache.flink.runtime.state.changelog.LocalChangelogRegistryImpl;
 import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
 
+import org.apache.commons.io.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -167,6 +168,7 @@ public class FsStateChangelogStorage extends 
FsStateChangelogStorageForRecovery
     @Override
     public void close() throws Exception {
         uploader.close();
+        IOUtils.closeQuietly(localChangelogRegistry);
     }
 
     @Override
diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
index aebb9adf58f..f0a4b21646e 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
@@ -63,9 +63,9 @@ import static org.apache.flink.util.Preconditions.checkState;
  * thread synchronization); {@link SequenceNumber} is not changed.
  *
  * <p>However, if they exceed {@link #preEmptivePersistThresholdInBytes} then 
{@link
- * #persistInternal(SequenceNumber) persist} is called.
+ * #persistInternal(SequenceNumber, long) persist} is called.
  *
- * <p>On {@link #persist(SequenceNumber) persist}, accumulated changes are 
sent to the {@link
+ * <p>On {@link #persist(SequenceNumber, long) persist}, accumulated changes 
are sent to the {@link
  * StateChangeUploadScheduler} as an immutable {@link 
StateChangeUploadScheduler.UploadTask task}.
  * An {@link FsStateChangelogWriter.UploadCompletionListener upload listener} 
is also registered.
  * Upon notification it updates the Writer local state (for future persist 
calls) and completes the
@@ -90,6 +90,7 @@ import static org.apache.flink.util.Preconditions.checkState;
 @NotThreadSafe
 class FsStateChangelogWriter implements 
StateChangelogWriter<ChangelogStateHandleStreamImpl> {
     private static final Logger LOG = 
LoggerFactory.getLogger(FsStateChangelogWriter.class);
+    private static final long DUMMY_PERSIST_CHECKPOINT = -1L;
     private static final SequenceNumber INITIAL_SQN = SequenceNumber.of(0L);
 
     private final UUID logId;
@@ -207,13 +208,13 @@ class FsStateChangelogWriter implements 
StateChangelogWriter<ChangelogStateHandl
 
     @Override
     public CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> 
persist(
-            SequenceNumber from) throws IOException {
+            SequenceNumber from, long checkpointId) throws IOException {
         LOG.debug(
                 "persist {} starting from sqn {} (incl.), active sqn: {}",
                 logId,
                 from,
                 activeSequenceNumber);
-        return persistInternal(from);
+        return persistInternal(from, checkpointId);
     }
 
     private void preEmptiveFlushIfNeeded(byte[] value) throws IOException {
@@ -222,17 +223,31 @@ class FsStateChangelogWriter implements 
StateChangelogWriter<ChangelogStateHandl
             LOG.debug(
                     "pre-emptively flush {}MB of appended changes to the 
common store",
                     activeChangeSetSize / 1024 / 1024);
-            persistInternal(notUploaded.isEmpty() ? activeSequenceNumber : 
notUploaded.firstKey());
+            persistInternal(
+                    notUploaded.isEmpty() ? activeSequenceNumber : 
notUploaded.firstKey(),
+                    DUMMY_PERSIST_CHECKPOINT);
         }
     }
 
     private CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> 
persistInternal(
-            SequenceNumber from) throws IOException {
+            SequenceNumber from, long checkpointId) throws IOException {
         ensureCanPersist(from);
         rollover();
         Map<SequenceNumber, StateChangeSet> toUpload = 
drainTailMap(notUploaded, from);
         NavigableMap<SequenceNumber, UploadResult> readyToReturn = 
uploaded.tailMap(from, true);
-        LOG.debug("collected readyToReturn: {}, toUpload: {}", readyToReturn, 
toUpload);
+        LOG.debug(
+                "collected readyToReturn: {}, toUpload: {}, checkpointId: {}.",
+                readyToReturn,
+                toUpload,
+                checkpointId);
+
+        if (checkpointId != DUMMY_PERSIST_CHECKPOINT) {
+            for (UploadResult uploadResult : readyToReturn.values()) {
+                if (uploadResult.localStreamHandle != null) {
+                    
localChangelogRegistry.register(uploadResult.localStreamHandle, checkpointId);
+                }
+            }
+        }
 
         SequenceNumberRange range = SequenceNumberRange.generic(from, 
activeSequenceNumber);
         if (range.size() == readyToReturn.size()) {
@@ -248,7 +263,7 @@ class FsStateChangelogWriter implements 
StateChangelogWriter<ChangelogStateHandl
                 UploadTask uploadTask =
                         new UploadTask(
                                 toUpload.values(),
-                                this::handleUploadSuccess,
+                                uploadResults -> 
handleUploadSuccess(uploadResults, checkpointId),
                                 this::handleUploadFailure);
                 uploader.upload(uploadTask);
             }
@@ -276,7 +291,7 @@ class FsStateChangelogWriter implements 
StateChangelogWriter<ChangelogStateHandl
                 "handleUploadFailure");
     }
 
-    private void handleUploadSuccess(List<UploadResult> results) {
+    private void handleUploadSuccess(List<UploadResult> results, long 
checkpointId) {
         mailboxExecutor.execute(
                 () -> {
                     if (closed) {
@@ -287,6 +302,12 @@ class FsStateChangelogWriter implements 
StateChangelogWriter<ChangelogStateHandl
                     } else {
                         uploadCompletionListeners.removeIf(listener -> 
listener.onSuccess(results));
                         for (UploadResult result : results) {
+                            if (checkpointId != DUMMY_PERSIST_CHECKPOINT) {
+                                if (result.localStreamHandle != null) {
+                                    localChangelogRegistry.register(
+                                            result.localStreamHandle, 
checkpointId);
+                                }
+                            }
                             SequenceNumber resultSqn = result.sequenceNumber;
                             if (resultSqn.compareTo(lowestSequenceNumber) >= 0
                                     && 
resultSqn.compareTo(highestSequenceNumber) < 0) {
@@ -373,19 +394,14 @@ class FsStateChangelogWriter implements 
StateChangelogWriter<ChangelogStateHandl
                 .forEach(
                         localHandle -> {
                             changelogRegistry.stopTracking(localHandle);
-                            localChangelogRegistry.register(localHandle, 
checkpointId);
                         });
         localChangelogRegistry.discardUpToCheckpoint(checkpointId);
     }
 
-    @Override
-    public void subsume(long checkpointId) {
-        localChangelogRegistry.discardUpToCheckpoint(checkpointId);
-    }
-
     @Override
     public void reset(SequenceNumber from, SequenceNumber to, long 
checkpointId) {
-        localChangelogRegistry.prune(checkpointId);
+        // delete all accumulated local dstl files when abort
+        localChangelogRegistry.discardUpToCheckpoint(checkpointId + 1);
     }
 
     private SnapshotResult<ChangelogStateHandleStreamImpl> buildSnapshotResult(
diff --git 
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java
 
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java
index ebf99c338fd..97db57f6547 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java
@@ -78,7 +78,7 @@ public class ChangelogStorageMetricsTest {
             for (int i = 0; i < numUploads; i++) {
                 SequenceNumber from = writer.nextSequenceNumber();
                 writer.append(0, new byte[] {0, 1, 2, 3});
-                writer.persist(from).get();
+                writer.persist(from, 1L).get();
             }
             
assertThat(metrics.getUploadsCounter().getCount()).isEqualTo(numUploads);
             
assertThat(metrics.getUploadLatenciesNanos().getStatistics().getMin()).isGreaterThan(0);
@@ -104,14 +104,14 @@ public class ChangelogStorageMetricsTest {
             // upload single byte to infer header size
             SequenceNumber from = writer.nextSequenceNumber();
             writer.append(0, new byte[] {0});
-            writer.persist(from).get();
+            writer.persist(from, 1L).get();
             long headerSize = 
metrics.getUploadSizes().getStatistics().getMin() - 1;
 
             byte[] upload = new byte[33];
             for (int i = 0; i < 5; i++) {
                 from = writer.nextSequenceNumber();
                 writer.append(0, upload);
-                writer.persist(from).get();
+                writer.persist(from, 1L).get();
             }
             long expected = upload.length + headerSize;
             
assertThat(metrics.getUploadSizes().getStatistics().getMax()).isEqualTo(expected);
@@ -140,7 +140,7 @@ public class ChangelogStorageMetricsTest {
                 SequenceNumber from = writer.nextSequenceNumber();
                 writer.append(0, new byte[] {0, 1, 2, 3});
                 try {
-                    writer.persist(from).get();
+                    writer.persist(from, 1L).get();
                 } catch (IOException e) {
                     // ignore
                 }
@@ -201,7 +201,7 @@ public class ChangelogStorageMetricsTest {
                     // cause actual uploads
                     SequenceNumber from = writers[writer].nextSequenceNumber();
                     writers[writer].append(0, new byte[] {0, 1, 2, 3});
-                    writers[writer].persist(from);
+                    writers[writer].persist(from, 1L);
                 }
                 // now the uploads should be grouped and executed at once
                 scheduler.triggerScheduledTasks();
@@ -248,7 +248,7 @@ public class ChangelogStorageMetricsTest {
             for (int upload = 0; upload < numUploads; upload++) {
                 SequenceNumber from = writer.nextSequenceNumber();
                 writer.append(0, new byte[] {0, 1, 2, 3});
-                writer.persist(from).get();
+                writer.persist(from, 1L).get();
             }
         } finally {
             storage.close();
@@ -293,7 +293,7 @@ public class ChangelogStorageMetricsTest {
             for (int upload = 0; upload < numUploads; upload++) {
                 SequenceNumber from = writer.nextSequenceNumber();
                 writer.append(0, new byte[] {0, 1, 2, 3});
-                writer.persist(from).get();
+                writer.persist(from, 1L).get();
             }
         } finally {
             storage.close();
@@ -359,7 +359,7 @@ public class ChangelogStorageMetricsTest {
             for (int i = 0; i < numUploads; i++) {
                 SequenceNumber from = writer.nextSequenceNumber();
                 writer.append(0, new byte[] {0});
-                writer.persist(from);
+                writer.persist(from, 1L);
             }
             assertThat((int) 
queueSizeGauge.get().getValue()).isEqualTo(numUploads);
             scheduler.triggerScheduledTasks();
diff --git 
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/DiscardRecordableStateChangeUploader.java
 
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/DiscardRecordableStateChangeUploader.java
index 90199f304f5..d263c751b4e 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/DiscardRecordableStateChangeUploader.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/DiscardRecordableStateChangeUploader.java
@@ -55,8 +55,9 @@ public class DiscardRecordableStateChangeUploader implements 
StateChangeUploader
 
         // fake StreamStateHandle without data, just for discarding records
         StreamStateHandle handle = new TestingStreamStateHandle();
+        StreamStateHandle localHandle = new TestingStreamStateHandle();
         changelogRegistry.startTracking(handle, numOfChangeSets);
-        return new UploadTasksResult(tasksOffsets, handle);
+        return new UploadTasksResult(tasksOffsets, handle, localHandle);
     }
 
     public boolean isDiscarded(StreamStateHandle handle) {
diff --git 
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterSqnTest.java
 
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterSqnTest.java
index 4ea8f665002..b3beabb6ae2 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterSqnTest.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterSqnTest.java
@@ -153,6 +153,6 @@ public class FsStateChangelogWriterSqnTest {
     }
 
     private static void persistAll(FsStateChangelogWriter writer) throws 
IOException {
-        writer.persist(writer.initialSequenceNumber());
+        writer.persist(writer.initialSequenceNumber(), 1L);
     }
 }
diff --git 
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java
 
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java
index f5be2ad2deb..12f1df85d27 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
 import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl;
 import org.apache.flink.runtime.state.changelog.LocalChangelogRegistry;
+import org.apache.flink.runtime.state.changelog.LocalChangelogRegistryImpl;
 import org.apache.flink.runtime.state.changelog.SequenceNumber;
 import org.apache.flink.util.concurrent.Executors;
 import org.apache.flink.util.function.BiConsumerWithException;
@@ -68,7 +69,7 @@ class FsStateChangelogWriterTest {
                     SequenceNumber sqn = append(writer, bytes);
                     assertSubmittedOnly(uploader, bytes);
                     uploader.reset();
-                    writer.persist(sqn);
+                    writer.persist(sqn, 1L);
                     assertNoUpload(uploader, "changes should have been 
pre-uploaded");
                 });
     }
@@ -79,7 +80,7 @@ class FsStateChangelogWriterTest {
                 (writer, uploader) -> {
                     byte[] bytes = getBytes();
                     
CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> future =
-                            writer.persist(append(writer, bytes));
+                            writer.persist(append(writer, bytes), 1L);
                     assertSubmittedOnly(uploader, bytes);
                     uploader.completeUpload();
                     assertThat(
@@ -100,11 +101,11 @@ class FsStateChangelogWriterTest {
                 (writer, uploader) -> {
                     byte[] bytes = getBytes();
                     SequenceNumber sqn = append(writer, bytes);
-                    writer.persist(sqn);
+                    writer.persist(sqn, 1L);
                     uploader.completeUpload();
                     uploader.reset();
                     writer.confirm(sqn, writer.nextSequenceNumber(), 1L);
-                    writer.persist(sqn);
+                    writer.persist(sqn, 2L);
                     assertNoUpload(uploader, "confirmed changes shouldn't be 
re-uploaded");
                 });
     }
@@ -134,11 +135,12 @@ class FsStateChangelogWriterTest {
 
             writer.append(KEY_GROUP, getBytes(10)); // sqn: 0
 
+            long checkpointId = 1L;
             // checkpoint 1 trigger
             SequenceNumber checkpoint1sqn = writer.nextSequenceNumber();
-            writer.persist(initialSqn);
+            writer.persist(initialSqn, checkpointId);
             uploadScheduler.scheduleAll(); // checkpoint 1 completed
-            writer.confirm(initialSqn, checkpoint1sqn, 1);
+            writer.confirm(initialSqn, checkpoint1sqn, checkpointId);
 
             writer.append(KEY_GROUP, getBytes(10)); // sqn: 1
 
@@ -150,9 +152,9 @@ class FsStateChangelogWriterTest {
             // materialization 1 completed
             // checkpoint 2 trigger
             SequenceNumber checkpoint2sqn = writer.nextSequenceNumber();
-            writer.persist(materializationSqn);
+            writer.persist(materializationSqn, ++checkpointId);
             uploadScheduler.scheduleAll(); // checkpoint 2 completed
-            writer.confirm(materializationSqn, checkpoint2sqn, 2);
+            writer.confirm(materializationSqn, checkpoint2sqn, checkpointId);
 
             // checkpoint 1 subsumed
             writer.truncate(
@@ -164,9 +166,9 @@ class FsStateChangelogWriterTest {
 
             // checkpoint 3 trigger
             SequenceNumber checkpoint3sqn = writer.nextSequenceNumber();
-            writer.persist(materializationSqn);
+            writer.persist(materializationSqn, ++checkpointId);
             uploadScheduler.scheduleAll(); // checkpoint 3 completed
-            writer.confirm(materializationSqn, checkpoint3sqn, 3);
+            writer.confirm(materializationSqn, checkpoint3sqn, checkpointId);
 
             // trigger pre-emptive upload
             writer.append(KEY_GROUP, getBytes(100)); // sqn: 4
@@ -181,9 +183,9 @@ class FsStateChangelogWriterTest {
             // checkpoint 4 trigger
             SequenceNumber checkpoint4sqn = writer.nextSequenceNumber();
             CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> 
future =
-                    writer.persist(materializationSqn);
+                    writer.persist(materializationSqn, ++checkpointId);
             uploadScheduler.scheduleAll(); // checkpoint 4 completed
-            writer.confirm(materializationSqn, checkpoint4sqn, 4);
+            writer.confirm(materializationSqn, checkpoint4sqn, checkpointId);
 
             SnapshotResult<ChangelogStateHandleStreamImpl> result = 
future.get();
             ChangelogStateHandleStreamImpl resultHandle = 
result.getJobManagerOwnedSnapshot();
@@ -225,7 +227,7 @@ class FsStateChangelogWriterTest {
             // checkpoint 1 trigger
             SequenceNumber checkpoint1sqn = writer.nextSequenceNumber();
             CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> 
future =
-                    writer.persist(initialSqn);
+                    writer.persist(initialSqn, 1L);
 
             // trigger pre-emptive upload
             writer.append(KEY_GROUP, getBytes(100)); // sqn: 1
@@ -246,15 +248,228 @@ class FsStateChangelogWriterTest {
         }
     }
 
+    @Test
+    void testLocalFileDiscard() throws Exception {
+        long appendPersistThreshold = 100;
+        TaskChangelogRegistry taskChangelogRegistry =
+                new TaskChangelogRegistryImpl(Executors.directExecutor());
+
+        try (DiscardRecordableStateChangeUploader uploader =
+                        new 
DiscardRecordableStateChangeUploader(taskChangelogRegistry);
+                TestingBatchingUploadScheduler uploadScheduler =
+                        new TestingBatchingUploadScheduler(uploader);
+                FsStateChangelogWriter writer =
+                        new FsStateChangelogWriter(
+                                UUID.randomUUID(),
+                                KeyGroupRange.of(KEY_GROUP, KEY_GROUP),
+                                uploadScheduler,
+                                appendPersistThreshold,
+                                new SyncMailboxExecutor(),
+                                taskChangelogRegistry,
+                                TestLocalRecoveryConfig.enabledForTest(),
+                                new LocalChangelogRegistryImpl(
+                                        
Executors.newDirectExecutorService()))) {
+            SequenceNumber initialSqn = writer.initialSequenceNumber();
+
+            writer.append(KEY_GROUP, getBytes(10));
+
+            long checkpointId = 1L;
+            // checkpoint 1 trigger
+            SequenceNumber checkpoint1sqn = writer.nextSequenceNumber();
+            writer.persist(initialSqn, checkpointId);
+            uploadScheduler.scheduleAll(); // checkpoint 1 completed
+            writer.confirm(initialSqn, checkpoint1sqn, checkpointId);
+
+            // trigger pre-emptive upload
+            writer.append(KEY_GROUP, getBytes(100));
+            uploadScheduler.scheduleAll();
+            writer.append(KEY_GROUP, getBytes(10));
+            // checkpoint 2 trigger
+            SequenceNumber checkpoint2sqn = writer.nextSequenceNumber();
+            CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> 
future2 =
+                    writer.persist(initialSqn, ++checkpointId);
+            uploadScheduler.scheduleAll(); // checkpoint 2 completed
+            writer.confirm(initialSqn, checkpoint2sqn, checkpointId);
+            SnapshotResult<ChangelogStateHandleStreamImpl> result2 = 
future2.get();
+            for (Tuple2<StreamStateHandle, Long> handleAndOffset :
+                    result2.getTaskLocalSnapshot().getHandlesAndOffsets()) {
+                assertThat(uploader.isDiscarded(handleAndOffset.f0)).isFalse();
+            }
+
+            // materialization 1 trigger
+            SequenceNumber materializationSqn = writer.nextSequenceNumber();
+            writer.append(KEY_GROUP, getBytes(10));
+
+            // materialization 1 completed
+            // checkpoint 3 trigger
+            SequenceNumber checkpoint3sqn = writer.nextSequenceNumber();
+            writer.persist(materializationSqn, ++checkpointId);
+            uploadScheduler.scheduleAll(); // checkpoint 3 completed
+            writer.confirm(materializationSqn, checkpoint3sqn, checkpointId);
+            for (Tuple2<StreamStateHandle, Long> handleAndOffset :
+                    result2.getTaskLocalSnapshot().getHandlesAndOffsets()) {
+                assertThat(uploader.isDiscarded(handleAndOffset.f0)).isTrue();
+            }
+        }
+    }
+
+    @Test
+    void testLocalFileAfterMaterialize() throws Exception {
+        // If register local files when confirm(), the following case will 
fail:
+        // cp1 trigger: file1,file1'(local)
+        // JM: register [file1] to sharedRegistry
+        // cp1 complete: stopTracking [file1], register [file1'] to 
localRegistry
+        // cp2 trigger: file1,file1',file2,file2'
+        // JM: register [file1,file2] to sharedRegistry
+        // cp2 complete: stopTracking [file1,file1',file2,file2'], register 
[file1',file2'] to
+        // localRegistry
+        // cp1 subsume
+        // cp3 trigger:  file1,file1',file2,file2',file3,file3'
+        // materialization: uploaded.clear()
+        // JM: register [file1,file2,file3] to sharedRegistry
+        // cp3 complete: stopTracking [], register [] to localRegistry
+        // cp2 subsume: [file1', file2'] are discarded
+        // if restore from cp3: local file1',file2' are not found
+        long appendPersistThreshold = 100;
+        TaskChangelogRegistry taskChangelogRegistry =
+                new TaskChangelogRegistryImpl(Executors.directExecutor());
+
+        try (DiscardRecordableStateChangeUploader uploader =
+                        new 
DiscardRecordableStateChangeUploader(taskChangelogRegistry);
+                TestingBatchingUploadScheduler uploadScheduler =
+                        new TestingBatchingUploadScheduler(uploader);
+                FsStateChangelogWriter writer =
+                        new FsStateChangelogWriter(
+                                UUID.randomUUID(),
+                                KeyGroupRange.of(KEY_GROUP, KEY_GROUP),
+                                uploadScheduler,
+                                appendPersistThreshold,
+                                new SyncMailboxExecutor(),
+                                taskChangelogRegistry,
+                                TestLocalRecoveryConfig.enabledForTest(),
+                                new LocalChangelogRegistryImpl(
+                                        
Executors.newDirectExecutorService()))) {
+            SequenceNumber initialSqn = writer.initialSequenceNumber();
+
+            writer.append(KEY_GROUP, getBytes(10));
+
+            long checkpointId = 1L;
+            // checkpoint 1 trigger
+            SequenceNumber checkpoint1sqn = writer.nextSequenceNumber();
+            writer.persist(initialSqn, checkpointId);
+            uploadScheduler.scheduleAll(); // checkpoint 1 completed
+            writer.confirm(initialSqn, checkpoint1sqn, checkpointId);
+
+            writer.append(KEY_GROUP, getBytes(10));
+            // checkpoint 2 trigger
+            SequenceNumber checkpoint2sqn = writer.nextSequenceNumber();
+            writer.persist(initialSqn, ++checkpointId);
+            uploadScheduler.scheduleAll(); // checkpoint 2 completed
+            writer.confirm(initialSqn, checkpoint2sqn, checkpointId);
+
+            writer.append(KEY_GROUP, getBytes(10));
+            // checkpoint 3 trigger
+            SequenceNumber checkpoint3sqn = writer.nextSequenceNumber();
+            CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> 
future3 =
+                    writer.persist(initialSqn, ++checkpointId);
+            uploadScheduler.scheduleAll(); // checkpoint 3 completed
+
+            // materialization 1 trigger
+            SequenceNumber materializationSqn = writer.nextSequenceNumber();
+            writer.truncate(
+                    materializationSqn.compareTo(checkpoint1sqn) < 0
+                            ? materializationSqn
+                            : checkpoint1sqn);
+            // materialization 1 completed
+            // checkpoint 3 confirm
+            writer.confirm(materializationSqn, checkpoint3sqn, checkpointId);
+
+            writer.append(KEY_GROUP, getBytes(10));
+
+            SnapshotResult<ChangelogStateHandleStreamImpl> result3 = 
future3.get();
+
+            for (Tuple2<StreamStateHandle, Long> handleAndOffset :
+                    
result3.getJobManagerOwnedSnapshot().getHandlesAndOffsets()) {
+                assertThat(uploader.isDiscarded(handleAndOffset.f0)).isFalse();
+            }
+
+            for (Tuple2<StreamStateHandle, Long> handleAndOffset :
+                    result3.getTaskLocalSnapshot().getHandlesAndOffsets()) {
+                assertThat(uploader.isDiscarded(handleAndOffset.f0)).isFalse();
+            }
+        }
+    }
+
+    @Test
+    void testLocalFileAbort() throws Exception {
+        long appendPersistThreshold = 100;
+        TaskChangelogRegistry taskChangelogRegistry =
+                new TaskChangelogRegistryImpl(Executors.directExecutor());
+
+        try (DiscardRecordableStateChangeUploader uploader =
+                        new 
DiscardRecordableStateChangeUploader(taskChangelogRegistry);
+                TestingBatchingUploadScheduler uploadScheduler =
+                        new TestingBatchingUploadScheduler(uploader);
+                FsStateChangelogWriter writer =
+                        new FsStateChangelogWriter(
+                                UUID.randomUUID(),
+                                KeyGroupRange.of(KEY_GROUP, KEY_GROUP),
+                                uploadScheduler,
+                                appendPersistThreshold,
+                                new SyncMailboxExecutor(),
+                                taskChangelogRegistry,
+                                TestLocalRecoveryConfig.enabledForTest(),
+                                new LocalChangelogRegistryImpl(
+                                        
Executors.newDirectExecutorService()))) {
+            SequenceNumber initialSqn = writer.initialSequenceNumber();
+
+            writer.append(KEY_GROUP, getBytes(10));
+
+            long checkpointId = 1L;
+            // checkpoint 1 trigger
+            SequenceNumber checkpoint1sqn = writer.nextSequenceNumber();
+            writer.persist(initialSqn, checkpointId);
+            uploadScheduler.scheduleAll(); // checkpoint 1 completed
+            writer.confirm(initialSqn, checkpoint1sqn, checkpointId);
+
+            writer.append(KEY_GROUP, getBytes(200));
+            uploadScheduler.scheduleAll();
+            writer.append(KEY_GROUP, getBytes(10));
+            // checkpoint 2 trigger
+            SequenceNumber checkpoint2sqn = writer.nextSequenceNumber();
+            CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> 
future2 =
+                    writer.persist(initialSqn, ++checkpointId);
+            uploadScheduler.scheduleAll();
+
+            // checkpoint 2 abort, all local dstl files are deleted
+            writer.reset(initialSqn, checkpoint2sqn, checkpointId);
+            SnapshotResult<ChangelogStateHandleStreamImpl> result2 = 
future2.get();
+            assertThat(result2.getTaskLocalSnapshot().getHandlesAndOffsets())
+                    .allMatch(tuple -> uploader.isDiscarded(tuple.f0));
+
+            writer.append(KEY_GROUP, getBytes(10));
+            // checkpoint 3 trigger
+            SequenceNumber checkpoint3sqn = writer.nextSequenceNumber();
+            CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> 
future3 =
+                    writer.persist(initialSqn, ++checkpointId);
+            uploadScheduler.scheduleAll(); // checkpoint 3 completed
+            SnapshotResult<ChangelogStateHandleStreamImpl> result3 = 
future3.get();
+            // checkpoint 3 confirm, delete files of checkpoint 1,2
+            writer.confirm(initialSqn, checkpoint3sqn, checkpointId);
+            assertThat(result3.getTaskLocalSnapshot().getHandlesAndOffsets())
+                    .anyMatch(tuple -> !uploader.isDiscarded(tuple.f0));
+        }
+    }
+
     @Test
     void testNoReUploadBeforeCompletion() throws Exception {
         withWriter(
                 (writer, uploader) -> {
                     byte[] bytes = getBytes();
                     SequenceNumber sqn = append(writer, bytes);
-                    writer.persist(sqn);
+                    writer.persist(sqn, 1L);
                     uploader.reset();
-                    writer.persist(sqn);
+                    writer.persist(sqn, 2L);
                     assertNoUpload(uploader, "no re-upload should happen");
                 });
     }
@@ -264,11 +479,11 @@ class FsStateChangelogWriterTest {
         withWriter(
                 (writer, uploader) -> {
                     SequenceNumber sqn = append(writer, getBytes());
-                    writer.persist(sqn);
+                    writer.persist(sqn, 1L);
                     uploader.reset();
                     byte[] bytes = getBytes();
                     sqn = append(writer, bytes);
-                    writer.persist(sqn);
+                    writer.persist(sqn, 2L);
                     assertSubmittedOnly(uploader, bytes);
                 });
     }
@@ -282,7 +497,7 @@ class FsStateChangelogWriterTest {
                     SequenceNumber sqn = append(writer, bytes);
                     writer.reset(sqn, SequenceNumber.of(Long.MAX_VALUE), 
Long.MAX_VALUE);
                     uploader.reset();
-                    writer.persist(sqn);
+                    writer.persist(sqn, 1L);
                     assertSubmittedOnly(uploader, bytes);
                 });
     }
@@ -298,7 +513,7 @@ class FsStateChangelogWriterTest {
                                             CompletableFuture<
                                                             SnapshotResult<
                                                                     
ChangelogStateHandleStreamImpl>>
-                                                    future = 
writer.persist(sqn);
+                                                    future = 
writer.persist(sqn, 1L);
                                             uploader.failUpload(new 
RuntimeException("test"));
                                             try {
                                                 future.get();
@@ -317,9 +532,9 @@ class FsStateChangelogWriterTest {
                                         (writer, uploader) -> {
                                             byte[] bytes = getBytes();
                                             SequenceNumber sqn = 
append(writer, bytes);
-                                            writer.persist(sqn); // future 
result ignored
+                                            writer.persist(sqn, 1L); // future 
result ignored
                                             uploader.failUpload(new 
RuntimeException("test"));
-                                            writer.persist(sqn); // should 
fail right away
+                                            writer.persist(sqn, 2L); // should 
fail right away
                                         }))
                 .isInstanceOf(IOException.class);
     }
@@ -330,12 +545,12 @@ class FsStateChangelogWriterTest {
                 (writer, uploader) -> {
                     byte[] bytes = getBytes();
                     SequenceNumber sqn1 = append(writer, bytes);
-                    writer.persist(sqn1); // future result ignored
+                    writer.persist(sqn1, 1L); // future result ignored
                     uploader.failUpload(new RuntimeException("test"));
                     uploader.reset();
                     SequenceNumber sqn2 = append(writer, bytes);
                     
CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> future =
-                            writer.persist(sqn2);
+                            writer.persist(sqn2, 2L);
                     uploader.completeUpload();
                     future.get();
                 });
@@ -349,7 +564,7 @@ class FsStateChangelogWriterTest {
                                         (writer, uploader) -> {
                                             SequenceNumber sqn = 
append(writer, getBytes());
                                             writer.truncate(sqn.next());
-                                            writer.persist(sqn);
+                                            writer.persist(sqn, 1L);
                                         }))
                 .isInstanceOf(IllegalArgumentException.class);
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistry.java
index a2ffb66b84d..678667951f3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistry.java
@@ -21,11 +21,15 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.state.StreamStateHandle;
 
+import java.io.Closeable;
+import java.io.IOException;
+
 /** This registry is responsible for deleting changlog's local handles which 
are not in use. */
 @Internal
-public interface LocalChangelogRegistry {
+public interface LocalChangelogRegistry extends Closeable {
     LocalChangelogRegistry NO_OP =
             new LocalChangelogRegistry() {
+
                 @Override
                 public void register(StreamStateHandle handle, long 
checkpointID) {}
 
@@ -33,7 +37,7 @@ public interface LocalChangelogRegistry {
                 public void discardUpToCheckpoint(long upTo) {}
 
                 @Override
-                public void prune(long checkpointID) {}
+                public void close() throws IOException {}
             };
 
     /**
@@ -54,11 +58,4 @@ public interface LocalChangelogRegistry {
      * @param upTo lowest CheckpointID which is still valid.
      */
     void discardUpToCheckpoint(long upTo);
-
-    /**
-     * Called upon ChangelogKeyedStateBackend#notifyCheckpointAborted.
-     *
-     * @param checkpointID to abort
-     */
-    void prune(long checkpointID);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistryImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistryImpl.java
index acb8ae318ca..0d6338cf14c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistryImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistryImpl.java
@@ -26,15 +26,14 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
-import java.util.stream.Collectors;
 
 @Internal
 public class LocalChangelogRegistryImpl implements LocalChangelogRegistry {
@@ -47,9 +46,9 @@ public class LocalChangelogRegistryImpl implements 
LocalChangelogRegistry {
             handleToLastUsedCheckpointID = new ConcurrentHashMap<>();
 
     /** Executor for async state deletion. */
-    private final Executor asyncDisposalExecutor;
+    private final ExecutorService asyncDisposalExecutor;
 
-    public LocalChangelogRegistryImpl(Executor ioExecutor) {
+    public LocalChangelogRegistryImpl(ExecutorService ioExecutor) {
         this.asyncDisposalExecutor = ioExecutor;
     }
 
@@ -84,17 +83,6 @@ public class LocalChangelogRegistryImpl implements 
LocalChangelogRegistry {
         }
     }
 
-    public void prune(long checkpointID) {
-        Set<StreamStateHandle> handles =
-                handleToLastUsedCheckpointID.values().stream()
-                        .filter(tuple -> tuple.f1 == checkpointID)
-                        .map(tuple -> tuple.f0)
-                        .collect(Collectors.toSet());
-        for (StreamStateHandle handle : handles) {
-            scheduleAsyncDelete(handle);
-        }
-    }
-
     private void scheduleAsyncDelete(StreamStateHandle streamStateHandle) {
         if (streamStateHandle != null) {
             LOG.trace("Scheduled delete of state handle {}.", 
streamStateHandle);
@@ -115,4 +103,10 @@ public class LocalChangelogRegistryImpl implements 
LocalChangelogRegistry {
             }
         }
     }
+
+    @Override
+    public void close() throws IOException {
+        asyncDisposalExecutor.shutdown();
+        handleToLastUsedCheckpointID.clear();
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java
index 337047d6d05..2c633f69e89 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java
@@ -50,8 +50,10 @@ public interface StateChangelogWriter<Handle extends 
ChangelogStateHandle> exten
      * be called for the corresponding change set. with reset/truncate/confirm 
methods?
      *
      * @param from inclusive
+     * @param checkpointId to persist
      */
-    CompletableFuture<SnapshotResult<Handle>> persist(SequenceNumber from) 
throws IOException;
+    CompletableFuture<SnapshotResult<Handle>> persist(SequenceNumber from, 
long checkpointId)
+            throws IOException;
 
     /**
      * Truncate this state changelog to free up the resources and collect any 
garbage. That means:
@@ -79,13 +81,6 @@ public interface StateChangelogWriter<Handle extends 
ChangelogStateHandle> exten
      */
     void confirm(SequenceNumber from, SequenceNumber to, long checkpointId);
 
-    /**
-     * Mark the state changes of the given checkpoint as subsumed.
-     *
-     * @param checkpointId
-     */
-    void subsume(long checkpointId);
-
     /**
      * Reset the state the given state changes. Called upon abortion so that 
if requested later then
      * these changes will be re-uploaded.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
index 24d675d9a6c..62cd9f365d4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
@@ -95,7 +95,7 @@ class InMemoryStateChangelogWriter implements 
StateChangelogWriter<InMemoryChang
 
     @Override
     public CompletableFuture<SnapshotResult<InMemoryChangelogStateHandle>> 
persist(
-            SequenceNumber from) {
+            SequenceNumber from, long checkpointId) {
         LOG.debug("Persist after {}", from);
         Preconditions.checkNotNull(from);
         return completedFuture(
@@ -145,9 +145,6 @@ class InMemoryStateChangelogWriter implements 
StateChangelogWriter<InMemoryChang
     @Override
     public void confirm(SequenceNumber from, SequenceNumber to, long 
checkpointID) {}
 
-    @Override
-    public void subsume(long checkpointId) {}
-
     @Override
     public void reset(SequenceNumber from, SequenceNumber to, long 
checkpointID) {}
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java
index 62f3f37bda2..a0782edd10f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java
@@ -27,6 +27,10 @@ public class TestLocalRecoveryConfig {
         return new LocalRecoveryConfig(null);
     }
 
+    public static LocalRecoveryConfig enabledForTest() {
+        return new LocalRecoveryConfig(new TestDummyLocalDirectoryProvider());
+    }
+
     public static class TestDummyLocalDirectoryProvider implements 
LocalRecoveryDirectoryProvider {
 
         private TestDummyLocalDirectoryProvider() {}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistryTest.java
index d28c33e603a..bb04e0c338d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistryTest.java
@@ -33,7 +33,7 @@ public class LocalChangelogRegistryTest extends TestLogger {
     @Test
     public void testRegistryNormal() {
         LocalChangelogRegistry localStateRegistry =
-                new LocalChangelogRegistryImpl(Executors.directExecutor());
+                new 
LocalChangelogRegistryImpl(Executors.newDirectExecutorService());
         TestingStreamStateHandle handle1 = new TestingStreamStateHandle();
         TestingStreamStateHandle handle2 = new TestingStreamStateHandle();
         // checkpoint 1: handle1, handle2
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
index 44e8ed8319a..c0f0ebe6a43 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
@@ -100,7 +100,7 @@ public class StateChangelogStorageTest<T extends 
ChangelogStateHandle> {
                 writer.nextSequenceNumber();
             }
 
-            SnapshotResult<T> res = writer.persist(prev).get();
+            SnapshotResult<T> res = writer.persist(prev, 1).get();
             T jmHandle = res.getJobManagerOwnedSnapshot();
             StateChangelogHandleReader<T> reader = client.createReader();
             assertByteMapsEqual(appendsByKeyGroup, extract(jmHandle, reader));
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
index 77d0f33b61b..dbce58687d5 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
@@ -403,7 +403,7 @@ public class ChangelogKeyedStateBackend<K>
 
         return toRunnableFuture(
                 stateChangelogWriter
-                        .persist(lastUploadedFrom)
+                        .persist(lastUploadedFrom, checkpointId)
                         .thenApply(
                                 delta ->
                                         buildSnapshotResult(
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogTruncateHelper.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogTruncateHelper.java
index 12b5d3388f2..6a6b71868b5 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogTruncateHelper.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogTruncateHelper.java
@@ -70,7 +70,6 @@ class ChangelogTruncateHelper {
             subsumedUpTo = sqn;
             checkpointedUpTo.headMap(checkpointId, true).clear();
             truncate();
-            stateChangelogWriter.subsume(checkpointId);
         }
     }
 
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
index e00b85c1ff8..bd57c8a3680 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
@@ -125,7 +125,8 @@ abstract class StateChangeLoggerTestBase<Namespace> {
         }
 
         @Override
-        public CompletableFuture<?> persist(SequenceNumber from) throws 
IOException {
+        public CompletableFuture<?> persist(SequenceNumber from, long 
checkpointId)
+                throws IOException {
             throw new UnsupportedOperationException();
         }
 
@@ -135,9 +136,6 @@ abstract class StateChangeLoggerTestBase<Namespace> {
         @Override
         public void confirm(SequenceNumber from, SequenceNumber to, long 
checkpointId) {}
 
-        @Override
-        public void subsume(long checkpointId) {}
-
         @Override
         public void reset(SequenceNumber from, SequenceNumber to, long 
checkpointId) {}
 

Reply via email to