Myasuka commented on a change in pull request #17774:
URL: https://github.com/apache/flink/pull/17774#discussion_r769441966



##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
##########
@@ -97,12 +98,23 @@
 
     /**
      * Stores the materialized sstable files from all snapshots that build the 
incremental history.
+     * Used to check whether {@link PlaceholderStreamStateHandle} can be sent 
or the original {@link
+     * StreamStateHandle} must be used.
      */
-    @Nonnull private final SortedMap<Long, Set<StateHandleID>> 
materializedSstFiles;
+    @Nonnull private final SortedMap<Long, Set<StateHandleID>> 
uploadedStateIDs;

Review comment:
       I think `uploadedStateIDs` cannot give the exact meaning, it just 
conatins all state handles of completed checkpoints. Those artficats uploaded 
in failed checkpoints are **updated** but not in this collection.

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
##########
@@ -561,4 +587,37 @@ public void release() {
             }
         }
     }
+
+    private static class PreviousSnapshot {
+
+        @Nullable private final Set<StateHandleID> confirmedSstFiles;
+        private final Map<StateHandleID, StreamStateHandle> uploadedSstFiles;
+
+        private PreviousSnapshot(
+                @Nullable Set<StateHandleID> confirmedSstFiles,
+                @Nonnull Map<StateHandleID, StreamStateHandle> 
uploadedSstFiles) {
+            this.confirmedSstFiles = confirmedSstFiles;
+            this.uploadedSstFiles = 
Preconditions.checkNotNull(uploadedSstFiles);
+        }
+
+        private Optional<StreamStateHandle> getUploaded(StateHandleID 
stateHandleID) {
+            if (isConfirmed(stateHandleID)) {
+                // we introduce a placeholder state handle, that is replaced 
with the
+                // original from the shared state registry (created from a 
previous checkpoint)
+                return Optional.of(new PlaceholderStreamStateHandle());
+            } else {
+                // If the file was uploaded but not confirmed by JM the handle 
has to be resent
+                // because JM might lose it during changing the  leadership

Review comment:
       ```suggestion
                   // because JM might lose it during changing the leadership
   ```

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java
##########
@@ -72,7 +76,13 @@ public RocksDBStateUploader(int numberOfSnapshottingThreads) 
{
 
             for (Map.Entry<StateHandleID, 
CompletableFuture<StreamStateHandle>> entry :
                     futures.entrySet()) {
-                handles.put(entry.getKey(), entry.getValue().get());
+                StreamStateHandle handle = entry.getValue().get();
+                LOG.debug(
+                        "Uploaded {} ({} bytes) for key: {}",

Review comment:
       ```suggestion
                           "Uploaded {} ({} bytes) for state handle id: {}",
   ```

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
##########
@@ -181,19 +194,19 @@ public IncrementalRocksDBSnapshotResources 
syncPrepareResources(long checkpointI
                 checkpointId,
                 checkpointStreamFactory,
                 snapshotResources.snapshotDirectory,
-                snapshotResources.baseSstFiles,
+                snapshotResources.previousSnapshot,
                 snapshotResources.stateMetaInfoSnapshots);
     }
 
     @Override
     public void notifyCheckpointComplete(long completedCheckpointId) {
-        synchronized (materializedSstFiles) {
+        synchronized (uploadedStateIDs) {
             // FLINK-23949: 
materializedSstFiles.keySet().contains(completedCheckpointId) make sure

Review comment:
       Since `materializedSstFiles` has been removed in this PR, we should 
update related documentation.

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
##########
@@ -181,19 +194,19 @@ public IncrementalRocksDBSnapshotResources 
syncPrepareResources(long checkpointI
                 checkpointId,
                 checkpointStreamFactory,
                 snapshotResources.snapshotDirectory,
-                snapshotResources.baseSstFiles,
+                snapshotResources.previousSnapshot,
                 snapshotResources.stateMetaInfoSnapshots);
     }
 
     @Override
     public void notifyCheckpointComplete(long completedCheckpointId) {
-        synchronized (materializedSstFiles) {
+        synchronized (uploadedStateIDs) {
             // FLINK-23949: 
materializedSstFiles.keySet().contains(completedCheckpointId) make sure
             // the notified checkpointId is not a savepoint, otherwise next 
checkpoint will
             // degenerate into a full checkpoint
             if (completedCheckpointId > lastCompletedCheckpointId
-                    && 
materializedSstFiles.keySet().contains(completedCheckpointId)) {
-                materializedSstFiles
+                    && 
uploadedStateIDs.keySet().contains(completedCheckpointId)) {

Review comment:
       Though previous change did not cover this, maybe 
`uploadedStateIDs.contansKey(completedCheckpointId)` is better.

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
##########
@@ -97,12 +98,23 @@
 
     /**
      * Stores the materialized sstable files from all snapshots that build the 
incremental history.
+     * Used to check whether {@link PlaceholderStreamStateHandle} can be sent 
or the original {@link
+     * StreamStateHandle} must be used.
      */
-    @Nonnull private final SortedMap<Long, Set<StateHandleID>> 
materializedSstFiles;
+    @Nonnull private final SortedMap<Long, Set<StateHandleID>> 
uploadedStateIDs;
+
+    /**
+     * Last uploaded but potentially not confirmed SST files. Used if {@link 
#uploadedStateIDs}
+     * doesn't contain the corresponding {@link StateHandleID}.
+     */
+    @Nonnull private final Map<StateHandleID, StreamStateHandle> 
lastUploadedSstFiles;

Review comment:
       After reviewing the changes of RocksDB incremental checkpoint part, it 
seems current implementation cannot handle continous checkpoint failure, which 
is actually common in production environment. If the job cannot make the 
checkpoint complete in the first two rounds, and it will also upload artifacts 
on the 3rd round. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to