rkhachatryan commented on a change in pull request #17774:
URL: https://github.com/apache/flink/pull/17774#discussion_r767862072
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
##########
@@ -466,16 +496,11 @@ private void createUploadFilePaths(
final StateHandleID stateHandleID = new
StateHandleID(fileName);
if (fileName.endsWith(SST_FILE_SUFFIX)) {
- final boolean existsAlready =
- baseSstFiles != null &&
baseSstFiles.contains(stateHandleID);
-
- if (existsAlready) {
- // we introduce a placeholder state handle, that is
replaced with the
- // original from the shared state registry (created
from a previous
- // checkpoint)
- sstFiles.put(stateHandleID, new
PlaceholderStreamStateHandle());
+ Optional<StreamStateHandle> uploaded =
previousSnapshot.get(stateHandleID);
+ if (uploaded.isPresent()) {
+ sstFiles.put(stateHandleID, uploaded.get());
} else {
- sstFilePaths.put(stateHandleID, filePath);
+ sstFilePaths.put(stateHandleID, filePath); // re-upload
}
Review comment:
I'd like to keep that method separate; there are already several
questions about its logic which is not straightforward.
The outer logic (`createUploadFilePaths`) I think is independent of it:
re-upload SST if the previous snapshot doesn't have it in some form.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]