fredia commented on code in PR #25090:
URL: https://github.com/apache/flink/pull/25090#discussion_r1677754547
##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/SnapshotFileMergingCompatibilityITCase.java:
##########
@@ -319,9 +333,106 @@ private static void verifyCheckpointExistOrWaitDeleted(
Thread.sleep(500L);
waited += 500L;
// Or timeout
- assertThat(waited).isLessThan(DELETE_TIMEOUT_MILLS);
+ if (waited >= DELETE_TIMEOUT_MILLS) {
+ assertThat(fs.exists(checkpointDir)).isFalse();
+ assertThat(fs.listStatus(sharedFile)).isNullOrEmpty();
+
assertThat(fs.listStatus(taskOwnedFile)).isNullOrEmpty();
+ }
}
}
}
}
+
+ /**
+ * Traverse the checkpoint metadata and verify all the state handle is
disposed.
+ *
+ * @param metadata the metadata to traverse.
+ * @return true if all corresponding files are deleted.
+ */
+ private static boolean verifyCheckpointDisposed(CheckpointMetadata
metadata) {
+ AtomicBoolean disposed = new AtomicBoolean(true);
+ for (OperatorState operatorState : metadata.getOperatorStates()) {
+ for (OperatorSubtaskState subtaskState :
operatorState.getStates()) {
+ // Check keyed state handle
+ List<KeyedStateHandle> keyedStateHandles =
+ new ArrayList<>(subtaskState.getManagedKeyedState());
+ for (KeyedStateHandle keyedStateHandle : keyedStateHandles) {
+ assertThat(keyedStateHandle)
+
.isInstanceOf(IncrementalRemoteKeyedStateHandle.class);
+ ((IncrementalRemoteKeyedStateHandle) keyedStateHandle)
+ .streamSubHandles()
+ .forEach(
+ handle -> {
+ try {
+ if (handle instanceof
FileStateHandle) {
Review Comment:
Should `SegmentFileStateHandle` also be checked here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]