Myasuka commented on a change in pull request #17774:
URL: https://github.com/apache/flink/pull/17774#discussion_r769441966
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
##########
@@ -97,12 +98,23 @@
/**
* Stores the materialized sstable files from all snapshots that build the
incremental history.
+ * Used to check whether {@link PlaceholderStreamStateHandle} can be sent
or the original {@link
+ * StreamStateHandle} must be used.
*/
- @Nonnull private final SortedMap<Long, Set<StateHandleID>>
materializedSstFiles;
+ @Nonnull private final SortedMap<Long, Set<StateHandleID>>
uploadedStateIDs;
Review comment:
I think `uploadedStateIDs` cannot give the exact meaning, it just
conatins all state handles of completed checkpoints. Those artficats uploaded
in failed checkpoints are **updated** but not in this collection.
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
##########
@@ -561,4 +587,37 @@ public void release() {
}
}
}
+
+ private static class PreviousSnapshot {
+
+ @Nullable private final Set<StateHandleID> confirmedSstFiles;
+ private final Map<StateHandleID, StreamStateHandle> uploadedSstFiles;
+
+ private PreviousSnapshot(
+ @Nullable Set<StateHandleID> confirmedSstFiles,
+ @Nonnull Map<StateHandleID, StreamStateHandle>
uploadedSstFiles) {
+ this.confirmedSstFiles = confirmedSstFiles;
+ this.uploadedSstFiles =
Preconditions.checkNotNull(uploadedSstFiles);
+ }
+
+ private Optional<StreamStateHandle> getUploaded(StateHandleID
stateHandleID) {
+ if (isConfirmed(stateHandleID)) {
+ // we introduce a placeholder state handle, that is replaced
with the
+ // original from the shared state registry (created from a
previous checkpoint)
+ return Optional.of(new PlaceholderStreamStateHandle());
+ } else {
+ // If the file was uploaded but not confirmed by JM the handle
has to be resent
+ // because JM might lose it during changing the leadership
Review comment:
```suggestion
// because JM might lose it during changing the leadership
```
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java
##########
@@ -72,7 +76,13 @@ public RocksDBStateUploader(int numberOfSnapshottingThreads)
{
for (Map.Entry<StateHandleID,
CompletableFuture<StreamStateHandle>> entry :
futures.entrySet()) {
- handles.put(entry.getKey(), entry.getValue().get());
+ StreamStateHandle handle = entry.getValue().get();
+ LOG.debug(
+ "Uploaded {} ({} bytes) for key: {}",
Review comment:
```suggestion
"Uploaded {} ({} bytes) for state handle id: {}",
```
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
##########
@@ -181,19 +194,19 @@ public IncrementalRocksDBSnapshotResources
syncPrepareResources(long checkpointI
checkpointId,
checkpointStreamFactory,
snapshotResources.snapshotDirectory,
- snapshotResources.baseSstFiles,
+ snapshotResources.previousSnapshot,
snapshotResources.stateMetaInfoSnapshots);
}
@Override
public void notifyCheckpointComplete(long completedCheckpointId) {
- synchronized (materializedSstFiles) {
+ synchronized (uploadedStateIDs) {
// FLINK-23949:
materializedSstFiles.keySet().contains(completedCheckpointId) make sure
Review comment:
Since `materializedSstFiles` has been removed in this PR, we should
update related documentation.
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
##########
@@ -181,19 +194,19 @@ public IncrementalRocksDBSnapshotResources
syncPrepareResources(long checkpointI
checkpointId,
checkpointStreamFactory,
snapshotResources.snapshotDirectory,
- snapshotResources.baseSstFiles,
+ snapshotResources.previousSnapshot,
snapshotResources.stateMetaInfoSnapshots);
}
@Override
public void notifyCheckpointComplete(long completedCheckpointId) {
- synchronized (materializedSstFiles) {
+ synchronized (uploadedStateIDs) {
// FLINK-23949:
materializedSstFiles.keySet().contains(completedCheckpointId) make sure
// the notified checkpointId is not a savepoint, otherwise next
checkpoint will
// degenerate into a full checkpoint
if (completedCheckpointId > lastCompletedCheckpointId
- &&
materializedSstFiles.keySet().contains(completedCheckpointId)) {
- materializedSstFiles
+ &&
uploadedStateIDs.keySet().contains(completedCheckpointId)) {
Review comment:
Though previous change did not cover this, maybe
`uploadedStateIDs.contansKey(completedCheckpointId)` is better.
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
##########
@@ -97,12 +98,23 @@
/**
* Stores the materialized sstable files from all snapshots that build the
incremental history.
+ * Used to check whether {@link PlaceholderStreamStateHandle} can be sent
or the original {@link
+ * StreamStateHandle} must be used.
*/
- @Nonnull private final SortedMap<Long, Set<StateHandleID>>
materializedSstFiles;
+ @Nonnull private final SortedMap<Long, Set<StateHandleID>>
uploadedStateIDs;
+
+ /**
+ * Last uploaded but potentially not confirmed SST files. Used if {@link
#uploadedStateIDs}
+ * doesn't contain the corresponding {@link StateHandleID}.
+ */
+ @Nonnull private final Map<StateHandleID, StreamStateHandle>
lastUploadedSstFiles;
Review comment:
After reviewing the changes of RocksDB incremental checkpoint part, it
seems current implementation cannot handle continous checkpoint failure, which
is actually common in production environment. If the job cannot make the
checkpoint complete in the first two rounds, and it will also upload artifacts
on the 3rd round.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]