StefanRRichter commented on a change in pull request #8263:
[FLINK-12296][StateBackend]Data loss silently in RocksDBStateBackend because of
local directory collision
URL: https://github.com/apache/flink/pull/8263#discussion_r278870885
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
##########
@@ -182,7 +186,14 @@ private SnapshotDirectory
prepareLocalSnapshotDirectory(long checkpointId) throw
if (localRecoveryConfig.isLocalRecoveryEnabled()) {
// create a "permanent" snapshot directory for local
recovery.
LocalRecoveryDirectoryProvider directoryProvider =
localRecoveryConfig.getLocalStateDirectoryProvider();
- File directory =
directoryProvider.subtaskSpecificCheckpointDirectory(checkpointId);
+ String operatorId =
OperatorSubtaskDescriptionText.getOperatorIdByOperatorIdentifier(operatorIdentifier);
+ if (operatorId == null) {
+ operatorId = operatorIdentifier;
+ LOG.warn("Can not extract operatorId from {},
the expected format is {}.",
+ operatorIdentifier,
+ EXTRACT_OPERATOR_PATTERN.toString());
+ }
+ File directory =
directoryProvider.subtaskSpecificCheckpointDirectory(checkpointId, operatorId);
if (directory.exists()) {
Review comment:
As commented on the JIRA already, instead of applying all the changes in the
PR, wouln't it be sufficient to remove this deletion an existing root directory
of the checkpoint? This looks like the actual cause of the problem to me. What
do you think? We could check and delete an existing subdirectory for the
particular operator instead.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services