This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-2.0 by this push:
     new a8dac03c829 [FLINK-38574][checkpoint] Avoid reusing re-uploaded sst 
files when checkpoint notification is delayed (#27157)
a8dac03c829 is described below

commit a8dac03c829d877cbfd448d6b4add39283105452
Author: Zakelly <[email protected]>
AuthorDate: Fri Oct 31 17:22:43 2025 +0800

    [FLINK-38574][checkpoint] Avoid reusing re-uploaded sst files when 
checkpoint notification is delayed (#27157)
---
 .../snapshot/ForStIncrementalSnapshotStrategy.java | 15 ++--
 .../forst/snapshot/ForStSnapshotStrategyBase.java  | 94 ++++++++++++++++++++--
 .../ForStIncrementalSnapshotStrategyTest.java      | 35 ++++++++
 .../snapshot/RocksDBSnapshotStrategyBase.java      | 90 +++++++++++++++++++--
 .../snapshot/RocksIncrementalSnapshotStrategy.java | 16 ++--
 .../RocksIncrementalSnapshotStrategyTest.java      | 35 ++++++++
 6 files changed, 257 insertions(+), 28 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategy.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategy.java
index 253381c801a..45171f04700 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategy.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategy.java
@@ -196,30 +196,29 @@ public class ForStIncrementalSnapshotStrategy<K> extends 
ForStNativeFullSnapshot
             long checkpointId, @Nonnull List<StateMetaInfoSnapshot> 
stateMetaInfoSnapshots) {
 
         final long lastCompletedCheckpoint;
-        final Collection<HandleAndLocalPath> confirmedSstFiles;
+        final SortedMap<Long, Collection<HandleAndLocalPath>> 
currentUploadedSstFiles;
 
         // use the last completed checkpoint as the comparison base.
         synchronized (uploadedSstFiles) {
             lastCompletedCheckpoint = lastCompletedCheckpointId;
-            confirmedSstFiles = uploadedSstFiles.get(lastCompletedCheckpoint);
-            LOG.trace(
-                    "Use confirmed SST files for checkpoint {}: {}",
-                    checkpointId,
-                    confirmedSstFiles);
+            currentUploadedSstFiles =
+                    new 
TreeMap<>(uploadedSstFiles.tailMap(lastCompletedCheckpoint));
         }
+        PreviousSnapshot previousSnapshot =
+                new PreviousSnapshot(currentUploadedSstFiles, 
lastCompletedCheckpoint);
         LOG.trace(
                 "Taking incremental snapshot for checkpoint {}. Snapshot is 
based on last completed checkpoint {} "
                         + "assuming the following (shared) confirmed files as 
base: {}.",
                 checkpointId,
                 lastCompletedCheckpoint,
-                confirmedSstFiles);
+                previousSnapshot);
 
         // snapshot meta data to save
         for (Map.Entry<String, ForStOperationUtils.ForStKvStateInfo> 
stateMetaInfoEntry :
                 kvStateInformation.entrySet()) {
             
stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().metaInfo.snapshot());
         }
-        return new PreviousSnapshot(confirmedSstFiles);
+        return previousSnapshot;
     }
 
     /** Encapsulates the process to perform an incremental snapshot of a 
ForStKeyedStateBackend. */
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStSnapshotStrategyBase.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStSnapshotStrategyBase.java
index 7350e8aaf6c..78c1a2e3e73 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStSnapshotStrategyBase.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStSnapshotStrategyBase.java
@@ -46,6 +46,8 @@ import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.ResourceGuard;
 
 import org.forstdb.RocksDB;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
@@ -58,6 +60,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.SortedMap;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
@@ -70,6 +73,8 @@ import java.util.stream.Collectors;
 public abstract class ForStSnapshotStrategyBase<K, R extends SnapshotResources>
         implements CheckpointListener, SnapshotStrategy<KeyedStateHandle, R>, 
AutoCloseable {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(ForStSnapshotStrategyBase.class);
+
     @Nonnull private final String description;
 
     /** ForSt instance from the backend. */
@@ -251,22 +256,94 @@ public abstract class ForStSnapshotStrategyBase<K, R 
extends SnapshotResources>
     }
 
     protected static final PreviousSnapshot EMPTY_PREVIOUS_SNAPSHOT =
-            new PreviousSnapshot(Collections.emptyList());
+            new PreviousSnapshot(null, -1L);
 
     /** Previous snapshot with uploaded sst files. */
     protected static class PreviousSnapshot {
 
         @Nonnull private final Map<String, StreamStateHandle> 
confirmedSstFiles;
 
-        protected PreviousSnapshot(@Nullable Collection<HandleAndLocalPath> 
confirmedSstFiles) {
+        /**
+         * Constructor of PreviousSnapshot. Giving a map of uploaded sst files 
in previous
+         * checkpoints, prune the sst files which have been re-uploaded in the 
following
+         * checkpoints. The prune logic is used to resolve the mismatch 
between TM and JM due to
+         * notification delay. Following steps for example:
+         *
+         * <ul>
+         *   <li>1) checkpoint 1 uses file 00001.SST uploaded as xxx.sst.
+         *   <li>2) checkpoint 2 uses the same file 00001.SST but re-uploads 
it as yyy.sst because
+         *       CP 1 wasn't yet confirmed.
+         *   <li>3) TM get a confirmation of checkpoint 1.
+         *   <li>4) JM completes checkpoint 2 and subsumes checkpoint 1 - 
removing xxx.sst.
+         *   <li>5) checkpoint 3 tries to re-use file 00001.SST uploaded as 
xxx.sst in checkpoint 1,
+         *       but it was deleted in (4) by JM.
+         * </ul>
+         *
+         * @param currentUploadedSstFiles the sst files uploaded in previous 
checkpoints.
+         * @param lastCompletedCheckpoint the last completed checkpoint id.
+         */
+        protected PreviousSnapshot(
+                @Nullable SortedMap<Long, Collection<HandleAndLocalPath>> 
currentUploadedSstFiles,
+                long lastCompletedCheckpoint) {
             this.confirmedSstFiles =
-                    confirmedSstFiles != null
-                            ? confirmedSstFiles.stream()
+                    currentUploadedSstFiles != null
+                            ? pruneFirstCheckpointSstFiles(
+                                    currentUploadedSstFiles, 
lastCompletedCheckpoint)
+                            : Collections.emptyMap();
+        }
+
+        /**
+         * The last completed checkpoint's uploaded sst files are all 
included, then for each
+         * following checkpoint, if a sst file has been re-uploaded, remove it 
from the first
+         * checkpoint's sst files.
+         *
+         * @param currentUploadedSstFiles the sst files uploaded in the 
following checkpoint.
+         * @param lastCompletedCheckpoint the last completed checkpoint id.
+         */
+        private Map<String, StreamStateHandle> pruneFirstCheckpointSstFiles(
+                @Nonnull SortedMap<Long, Collection<HandleAndLocalPath>> 
currentUploadedSstFiles,
+                long lastCompletedCheckpoint) {
+            Map<String, StreamStateHandle> prunedSstFiles = null;
+            int removedCount = 0;
+            for (Map.Entry<Long, Collection<HandleAndLocalPath>> entry :
+                    currentUploadedSstFiles.entrySet()) {
+                // Iterate checkpoints in ascending order of checkpoint id.
+                if (entry.getKey() == lastCompletedCheckpoint) {
+                    // The first checkpoint's uploaded sst files are all 
included.
+                    prunedSstFiles =
+                            entry.getValue().stream()
                                     .collect(
                                             Collectors.toMap(
                                                     
HandleAndLocalPath::getLocalPath,
-                                                    
HandleAndLocalPath::getHandle))
-                            : Collections.emptyMap();
+                                                    
HandleAndLocalPath::getHandle));
+                } else if (prunedSstFiles == null) {
+                    // The last completed checkpoint's uploaded sst files are 
not existed.
+                    // So we skip the pruning process.
+                    break;
+                } else if (!prunedSstFiles.isEmpty()) {
+                    // Prune sst files which have been re-uploaded in the 
following checkpoints.
+                    for (HandleAndLocalPath handleAndLocalPath : 
entry.getValue()) {
+                        if (!(handleAndLocalPath.getHandle()
+                                instanceof PlaceholderStreamStateHandle)) {
+                            // If it's not a placeholder handle, it means the 
sst file has been
+                            // re-uploaded in the following checkpoint.
+                            if 
(prunedSstFiles.remove(handleAndLocalPath.getLocalPath()) != null) {
+                                removedCount++;
+                            }
+                        }
+                    }
+                }
+            }
+            if (removedCount > 0 && LOG.isTraceEnabled()) {
+                LOG.trace(
+                        "Removed {} re-uploaded sst files from base file set 
for incremental "
+                                + "checkpoint. Base checkpoint id: {}",
+                        removedCount,
+                        currentUploadedSstFiles.firstKey());
+            }
+            return (prunedSstFiles != null && !prunedSstFiles.isEmpty())
+                    ? Collections.unmodifiableMap(prunedSstFiles)
+                    : Collections.emptyMap();
         }
 
         protected Optional<StreamStateHandle> getUploaded(String filename) {
@@ -290,5 +367,10 @@ public abstract class ForStSnapshotStrategyBase<K, R 
extends SnapshotResources>
         protected boolean isEmpty() {
             return confirmedSstFiles.isEmpty();
         }
+
+        @Override
+        public String toString() {
+            return "PreviousSnapshot{" + "confirmedSstFiles=" + 
confirmedSstFiles + '}';
+        }
     }
 }
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategyTest.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategyTest.java
index acd19b48b7e..034df702554 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategyTest.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategyTest.java
@@ -105,6 +105,41 @@ class ForStIncrementalSnapshotStrategyTest {
         }
     }
 
+    @Test
+    void testCheckpointIsIncrementalWithLateNotification() throws Exception {
+
+        try (CloseableRegistry closeableRegistry = new CloseableRegistry();
+                ForStSnapshotStrategyBase<?, ?> checkpointSnapshotStrategy =
+                        createSnapshotStrategy()) {
+            FsCheckpointStreamFactory checkpointStreamFactory = 
createFsCheckpointStreamFactory();
+
+            // make and checkpoint with id 1
+            snapshot(1L, checkpointSnapshotStrategy, checkpointStreamFactory, 
closeableRegistry);
+
+            // make and checkpoint with id 2
+            snapshot(2L, checkpointSnapshotStrategy, checkpointStreamFactory, 
closeableRegistry);
+
+            // Late notify checkpoint with id 1
+            checkpointSnapshotStrategy.notifyCheckpointComplete(1L);
+
+            // make checkpoint with id 3, based on checkpoint 1
+            IncrementalRemoteKeyedStateHandle 
incrementalRemoteKeyedStateHandle3 =
+                    snapshot(
+                            3L,
+                            checkpointSnapshotStrategy,
+                            checkpointStreamFactory,
+                            closeableRegistry);
+
+            // Late notify checkpoint with id 2
+            checkpointSnapshotStrategy.notifyCheckpointComplete(2L);
+
+            // 3rd checkpoint is based on 1st checkpoint, BUT the 2nd 
checkpoint re-uploaded the 1st
+            // one, so it should be based on nothing, thus this is effectively 
a full checkpoint.
+            assertThat(incrementalRemoteKeyedStateHandle3.getStateSize())
+                    
.isEqualTo(incrementalRemoteKeyedStateHandle3.getCheckpointedSize());
+        }
+    }
+
     @Test
     void testCheckpointIsFull() throws Exception {
 
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/snapshot/RocksDBSnapshotStrategyBase.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/snapshot/RocksDBSnapshotStrategyBase.java
index e06cd9896fa..85619a669df 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/snapshot/RocksDBSnapshotStrategyBase.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/snapshot/RocksDBSnapshotStrategyBase.java
@@ -68,6 +68,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.SortedMap;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
@@ -394,22 +395,94 @@ public abstract class RocksDBSnapshotStrategyBase<K, R 
extends SnapshotResources
     }
 
     protected static final PreviousSnapshot EMPTY_PREVIOUS_SNAPSHOT =
-            new PreviousSnapshot(Collections.emptyList());
+            new PreviousSnapshot(null, -1L);
 
     /** Previous snapshot with uploaded sst files. */
     protected static class PreviousSnapshot {
 
         @Nonnull private final Map<String, StreamStateHandle> 
confirmedSstFiles;
 
-        protected PreviousSnapshot(@Nullable Collection<HandleAndLocalPath> 
confirmedSstFiles) {
+        /**
+         * Constructor of PreviousSnapshot. Giving a map of uploaded sst files 
in previous
+         * checkpoints, prune the sst files which have been re-uploaded in the 
following
+         * checkpoints. The prune logic is used to resolve the mismatch 
between TM and JM due to
+         * notification delay. Following steps for example:
+         *
+         * <ul>
+         *   <li>1) checkpoint 1 uses file 00001.SST uploaded as xxx.sst.
+         *   <li>2) checkpoint 2 uses the same file 00001.SST but re-uploads 
it as yyy.sst because
+         *       CP 1 wasn't yet confirmed.
+         *   <li>3) TM get a confirmation of checkpoint 1.
+         *   <li>4) JM completes checkpoint 2 and subsumes checkpoint 1 - 
removing xxx.sst.
+         *   <li>5) checkpoint 3 tries to re-use file 00001.SST uploaded as 
xxx.sst in checkpoint 1,
+         *       but it was deleted in (4) by JM.
+         * </ul>
+         *
+         * @param currentUploadedSstFiles the sst files uploaded in previous 
checkpoints.
+         * @param lastCompletedCheckpoint the last completed checkpoint id.
+         */
+        protected PreviousSnapshot(
+                @Nullable SortedMap<Long, Collection<HandleAndLocalPath>> 
currentUploadedSstFiles,
+                long lastCompletedCheckpoint) {
             this.confirmedSstFiles =
-                    confirmedSstFiles != null
-                            ? confirmedSstFiles.stream()
+                    currentUploadedSstFiles != null
+                            ? pruneFirstCheckpointSstFiles(
+                                    currentUploadedSstFiles, 
lastCompletedCheckpoint)
+                            : Collections.emptyMap();
+        }
+
+        /**
+         * The last completed checkpoint's uploaded sst files are all 
included, then for each
+         * following checkpoint, if a sst file has been re-uploaded, remove it 
from the first
+         * checkpoint's sst files.
+         *
+         * @param currentUploadedSstFiles the sst files uploaded in the 
following checkpoint.
+         * @param lastCompletedCheckpoint the last completed checkpoint id.
+         */
+        private Map<String, StreamStateHandle> pruneFirstCheckpointSstFiles(
+                @Nonnull SortedMap<Long, Collection<HandleAndLocalPath>> 
currentUploadedSstFiles,
+                long lastCompletedCheckpoint) {
+            Map<String, StreamStateHandle> prunedSstFiles = null;
+            int removedCount = 0;
+            for (Map.Entry<Long, Collection<HandleAndLocalPath>> entry :
+                    currentUploadedSstFiles.entrySet()) {
+                // Iterate checkpoints in ascending order of checkpoint id.
+                if (entry.getKey() == lastCompletedCheckpoint) {
+                    // The first checkpoint's uploaded sst files are all 
included.
+                    prunedSstFiles =
+                            entry.getValue().stream()
                                     .collect(
                                             Collectors.toMap(
                                                     
HandleAndLocalPath::getLocalPath,
-                                                    
HandleAndLocalPath::getHandle))
-                            : Collections.emptyMap();
+                                                    
HandleAndLocalPath::getHandle));
+                } else if (prunedSstFiles == null) {
+                    // The last completed checkpoint's uploaded sst files are 
not existed.
+                    // So we skip the pruning process.
+                    break;
+                } else if (!prunedSstFiles.isEmpty()) {
+                    // Prune sst files which have been re-uploaded in the 
following checkpoints.
+                    for (HandleAndLocalPath handleAndLocalPath : 
entry.getValue()) {
+                        if (!(handleAndLocalPath.getHandle()
+                                instanceof PlaceholderStreamStateHandle)) {
+                            // If it's not a placeholder handle, it means the 
sst file has been
+                            // re-uploaded in the following checkpoint.
+                            if 
(prunedSstFiles.remove(handleAndLocalPath.getLocalPath()) != null) {
+                                removedCount++;
+                            }
+                        }
+                    }
+                }
+            }
+            if (removedCount > 0 && LOG.isTraceEnabled()) {
+                LOG.trace(
+                        "Removed {} re-uploaded sst files from base file set 
for incremental "
+                                + "checkpoint. Base checkpoint id: {}",
+                        removedCount,
+                        currentUploadedSstFiles.firstKey());
+            }
+            return (prunedSstFiles != null && !prunedSstFiles.isEmpty())
+                    ? Collections.unmodifiableMap(prunedSstFiles)
+                    : Collections.emptyMap();
         }
 
         protected Optional<StreamStateHandle> getUploaded(String filename) {
@@ -429,5 +502,10 @@ public abstract class RocksDBSnapshotStrategyBase<K, R 
extends SnapshotResources
                 return Optional.empty();
             }
         }
+
+        @Override
+        public String toString() {
+            return "PreviousSnapshot{" + "confirmedSstFiles=" + 
confirmedSstFiles + '}';
+        }
     }
 }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/snapshot/RocksIncrementalSnapshotStrategy.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/snapshot/RocksIncrementalSnapshotStrategy.java
index 5ae5cdd904f..af81c29fc82 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/snapshot/RocksIncrementalSnapshotStrategy.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/snapshot/RocksIncrementalSnapshotStrategy.java
@@ -196,30 +196,29 @@ public class RocksIncrementalSnapshotStrategy<K>
             long checkpointId, @Nonnull List<StateMetaInfoSnapshot> 
stateMetaInfoSnapshots) {
 
         final long lastCompletedCheckpoint;
-        final Collection<HandleAndLocalPath> confirmedSstFiles;
+        final SortedMap<Long, Collection<HandleAndLocalPath>> 
currentUploadedSstFiles;
 
         // use the last completed checkpoint as the comparison base.
         synchronized (uploadedSstFiles) {
             lastCompletedCheckpoint = lastCompletedCheckpointId;
-            confirmedSstFiles = uploadedSstFiles.get(lastCompletedCheckpoint);
-            LOG.trace(
-                    "Use confirmed SST files for checkpoint {}: {}",
-                    checkpointId,
-                    confirmedSstFiles);
+            currentUploadedSstFiles =
+                    new 
TreeMap<>(uploadedSstFiles.tailMap(lastCompletedCheckpoint));
         }
+        PreviousSnapshot previousSnapshot =
+                new PreviousSnapshot(currentUploadedSstFiles, 
lastCompletedCheckpoint);
         LOG.trace(
                 "Taking incremental snapshot for checkpoint {}. Snapshot is 
based on last completed checkpoint {} "
                         + "assuming the following (shared) confirmed files as 
base: {}.",
                 checkpointId,
                 lastCompletedCheckpoint,
-                confirmedSstFiles);
+                previousSnapshot);
 
         // snapshot meta data to save
         for (Map.Entry<String, RocksDbKvStateInfo> stateMetaInfoEntry :
                 kvStateInformation.entrySet()) {
             
stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().metaInfo.snapshot());
         }
-        return new PreviousSnapshot(confirmedSstFiles);
+        return previousSnapshot;
     }
 
     /**
@@ -352,6 +351,7 @@ public class RocksIncrementalSnapshotStrategy<K>
                 List<Path> miscFilePaths = new ArrayList<>(files.length);
 
                 createUploadFilePaths(files, sstFiles, sstFilePaths, 
miscFilePaths);
+                LOG.info("Will re-use {} SST files. {}", sstFiles.size(), 
sstFiles);
 
                 final CheckpointedStateScope stateScope =
                         sharingFilesStrategy == 
SnapshotType.SharingFilesStrategy.NO_SHARING
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/snapshot/RocksIncrementalSnapshotStrategyTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/snapshot/RocksIncrementalSnapshotStrategyTest.java
index 9007a7da70a..e6a7d3fdb2f 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/snapshot/RocksIncrementalSnapshotStrategyTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/snapshot/RocksIncrementalSnapshotStrategyTest.java
@@ -96,6 +96,41 @@ class RocksIncrementalSnapshotStrategyTest {
         }
     }
 
+    @Test
+    void testCheckpointIsIncrementalWithLateNotification() throws Exception {
+
+        try (CloseableRegistry closeableRegistry = new CloseableRegistry();
+                RocksIncrementalSnapshotStrategy<?> checkpointSnapshotStrategy 
=
+                        createSnapshotStrategy()) {
+            FsCheckpointStreamFactory checkpointStreamFactory = 
createFsCheckpointStreamFactory();
+
+            // make and checkpoint with id 1
+            snapshot(1L, checkpointSnapshotStrategy, checkpointStreamFactory, 
closeableRegistry);
+
+            // make and checkpoint with id 2
+            snapshot(2L, checkpointSnapshotStrategy, checkpointStreamFactory, 
closeableRegistry);
+
+            // Late notify checkpoint with id 1
+            checkpointSnapshotStrategy.notifyCheckpointComplete(1L);
+
+            // make checkpoint with id 3, based on checkpoint 1
+            IncrementalRemoteKeyedStateHandle 
incrementalRemoteKeyedStateHandle3 =
+                    snapshot(
+                            3L,
+                            checkpointSnapshotStrategy,
+                            checkpointStreamFactory,
+                            closeableRegistry);
+
+            // Late notify checkpoint with id 2
+            checkpointSnapshotStrategy.notifyCheckpointComplete(2L);
+
+            // 3rd checkpoint is based on 1st checkpoint, BUT the 2nd 
checkpoint re-uploaded the 1st
+            // one, so it should be based on nothing, thus this is effectively 
a full checkpoint.
+            assertThat(incrementalRemoteKeyedStateHandle3.getStateSize())
+                    
.isEqualTo(incrementalRemoteKeyedStateHandle3.getCheckpointedSize());
+        }
+    }
+
     public RocksIncrementalSnapshotStrategy<?> createSnapshotStrategy()
             throws IOException, RocksDBException {
 

Reply via email to