pnowojski commented on a change in pull request #17774:
URL: https://github.com/apache/flink/pull/17774#discussion_r767581589
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
##########
@@ -97,12 +98,20 @@
/**
* Stores the materialized sstable files from all snapshots that build the
incremental history.
+ * Used to check whether {@link PlaceholderStreamStateHandle} can be send
or the original {@link
+ * StreamStateHandle} must be used.
Review comment:
Can you explain when should we send the placeholder handles and when the
original ones? (maybe put it into the java doc?)
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
##########
@@ -466,16 +496,11 @@ private void createUploadFilePaths(
final StateHandleID stateHandleID = new
StateHandleID(fileName);
if (fileName.endsWith(SST_FILE_SUFFIX)) {
- final boolean existsAlready =
- baseSstFiles != null &&
baseSstFiles.contains(stateHandleID);
-
- if (existsAlready) {
- // we introduce a placeholder state handle, that is
replaced with the
- // original from the shared state registry (created
from a previous
- // checkpoint)
- sstFiles.put(stateHandleID, new
PlaceholderStreamStateHandle());
+ Optional<StreamStateHandle> uploaded =
previousSnapshot.get(stateHandleID);
+ if (uploaded.isPresent()) {
+ sstFiles.put(stateHandleID, uploaded.get());
} else {
- sstFilePaths.put(stateHandleID, filePath);
+ sstFilePaths.put(stateHandleID, filePath); // re-upload
}
Review comment:
Maybe inline here the `previousSnapshot.get()` logic, but extract this
whole snippet to `handleSstFile(...)` method? I have a feeling that
`previousSnapshot.get()` and the logic here is too much intertwined with this
code , and at the same the code is not that long to require splitting into even
smaller methods.
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
##########
@@ -561,4 +586,37 @@ public void release() {
}
}
}
+
+ private static class PreviousSnapshot {
+
+ @Nullable private final Set<StateHandleID> confirmedSstFiles;
+ private final Map<StateHandleID, StreamStateHandle> uploadedSstFiles;
+
+ private PreviousSnapshot(
+ @Nullable Set<StateHandleID> confirmedSstFiles,
+ @Nonnull Map<StateHandleID, StreamStateHandle>
uploadedSstFiles) {
+ this.confirmedSstFiles = confirmedSstFiles;
+ this.uploadedSstFiles =
Preconditions.checkNotNull(uploadedSstFiles);
+ }
+
+ private Optional<StreamStateHandle> get(StateHandleID stateHandleID) {
Review comment:
1. maybe rename to `getAlreadyUploadedHandles()`?
2. add a javadoc explaining this method?
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
##########
@@ -97,12 +98,20 @@
/**
* Stores the materialized sstable files from all snapshots that build the
incremental history.
+ * Used to check whether {@link PlaceholderStreamStateHandle} can be send
or the original {@link
+ * StreamStateHandle} must be used.
*/
@Nonnull private final SortedMap<Long, Set<StateHandleID>>
materializedSstFiles;
+ /** Uploaded but not yet confirmed SST files. Used to avoid re-uploading.
*/
+ @Nonnull private final Map<StateHandleID, StreamStateHandle>
lastUploadedSstFiles;
Review comment:
Is the java doc accurate? Shouldn't it be "uploaded but potentially not
yet confirmed SST files"? It looks like those files could be already confirmed?
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
##########
@@ -453,6 +470,19 @@ private void uploadSstFiles(
miscFiles.putAll(
stateUploader.uploadFilesToCheckpointFs(
miscFilePaths, checkpointStreamFactory,
snapshotCloseableRegistry));
+
+ synchronized (materializedSstFiles) {
+ // ignore an older upload if it completed after a newer
one has completed
+ if (checkpointId > lastCheckpointIdUploadedSst) {
+ lastCheckpointIdUploadedSst = checkpointId;
+ lastUploadedSstFiles.clear();
+ LOG.trace(
+ "Update lastUploadedSstFiles for checkpoint
{}: {}",
+ checkpointId,
+ sstFiles);
+ lastUploadedSstFiles.putAll(sstFiles);
+ }
+ }
Review comment:
It's a bit confusing that you have this logic here, while
`materializedSstFiles` are being updated just after `uploadSstFiles()` call.
Could we merge those two synchronised sections together?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1211,6 +1211,8 @@ private void completePendingCheckpoint(PendingCheckpoint
pendingCheckpoint)
final long checkpointId = pendingCheckpoint.getCheckpointId();
Review comment:
In the last commit message:
> For example, if a task sent state for the first time and then failed
without completing checkpoint and restarting JM;
Do you mean JM failure? Or TM/task failure?
> For example, if a task sent state for the first time and then JM failed
without completing checkpoint and restarting JM
?
Can you elaborate what is the scenario?
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
##########
@@ -258,30 +268,37 @@ private SnapshotDirectory
prepareLocalSnapshotDirectory(long checkpointId) throw
}
Review comment:
Shouldn't `RocksIncrementalSnapshotStrategy#notifyCheckpointAborted` be
changed as well? Aborted files could now be re-used as well, right? They are
still "materialised".
edit: actually I think the logic _maybe_ is correct, but something feels
inconsistent here. It's really hard for me to name what is inside
`materializedSstFiles`. Those are confirmed files and also those awaiting
confirmation. But aborted files are being removed from it.
What is even the purpose of the
`RocksIncrementalSnapshotStrategy#notifyCheckpointAborted`? Isn't it a dead
code? As far I can tell, it looks like only preventing memory leaks if
checkpoints are getting continuously aborted. If we removed it, we wouldn't
need `lastUploadedSstFiles` field. `lastUploadedCheckpointId` +
`materializedSstFiles` would be enough (and less confusing).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]