This is an automated email from the ASF dual-hosted git repository. zakelly pushed a commit to branch release-1.20 in repository https://gitbox.apache.org/repos/asf/flink.git
commit f14546cba1a3edd2dd6f93740f122dbc43ce101f Author: Zakelly <zakelly....@gmail.com> AuthorDate: Wed Jul 24 11:38:23 2024 +0800 [FLINK-35835][test] Make file-merging test tolerate the scenario that source does not produce any record --- .../SnapshotFileMergingCompatibilityITCase.java | 77 +++++++++++++--------- 1 file changed, 47 insertions(+), 30 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SnapshotFileMergingCompatibilityITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SnapshotFileMergingCompatibilityITCase.java index 172a1b62eea..1030c314405 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SnapshotFileMergingCompatibilityITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SnapshotFileMergingCompatibilityITCase.java @@ -287,7 +287,7 @@ public class SnapshotFileMergingCompatibilityITCase extends TestLogger { } } - private static void verifyCheckpointExistOrWaitDeleted( + private void verifyCheckpointExistOrWaitDeleted( String checkpointPath, TernaryBoolean exist, boolean fileMergingEnabled, @@ -307,8 +307,7 @@ public class SnapshotFileMergingCompatibilityITCase extends TestLogger { if (exist.equals(TernaryBoolean.TRUE)) { // should exist, just check assertThat(fs.exists(checkpointDir)).isTrue(); - assertThat(fs.listStatus(sharedFile) != null && fs.listStatus(sharedFile).length > 0) - .isTrue(); + assertThat(metadata == null || verifyCheckpointExist(metadata, true)).isTrue(); // Since there is no exclusive state, we should consider fileMergingEnabled. assertThat( fs.listStatus(taskOwnedFile) != null @@ -322,7 +321,7 @@ public class SnapshotFileMergingCompatibilityITCase extends TestLogger { try { fileExist = (fs.exists(checkpointDir) - || (metadata != null && !verifyCheckpointDisposed(metadata)) + || (metadata != null && !verifyCheckpointExist(metadata, false)) || !verifyCheckpointNoDirectory(fs, sharedFile, taskOwnedFile)); } catch (IOException e) { // Sometimes it may happen that the files are being deleted while we list them, @@ -344,13 +343,14 @@ public class SnapshotFileMergingCompatibilityITCase extends TestLogger { } /** - * Traverse the checkpoint metadata and verify all the state handle is disposed. + * Traverse the checkpoint metadata and verify all the state handle exist or be disposed. * * @param metadata the metadata to traverse. + * @param exist all corresponding files of the state handle should exist * @return true if all corresponding files are deleted. */ - private static boolean verifyCheckpointDisposed(CheckpointMetadata metadata) { - AtomicBoolean disposed = new AtomicBoolean(true); + private boolean verifyCheckpointExist(CheckpointMetadata metadata, boolean exist) { + AtomicBoolean result = new AtomicBoolean(true); for (OperatorState operatorState : metadata.getOperatorStates()) { for (OperatorSubtaskState subtaskState : operatorState.getStates()) { // Check keyed state handle @@ -359,24 +359,38 @@ public class SnapshotFileMergingCompatibilityITCase extends TestLogger { for (KeyedStateHandle keyedStateHandle : keyedStateHandles) { assertThat(keyedStateHandle) .isInstanceOf(IncrementalRemoteKeyedStateHandle.class); - ((IncrementalRemoteKeyedStateHandle) keyedStateHandle) - .streamSubHandles() - .forEach( - handle -> { - try { - if (handle instanceof FileStateHandle) { - org.apache.flink.core.fs.Path p = - ((FileStateHandle) handle).getFilePath(); - if (p.getFileSystem().exists(p)) { - disposed.set(false); + boolean singleResult = + ((IncrementalRemoteKeyedStateHandle) keyedStateHandle) + .streamSubHandles() + .allMatch( + handle -> { + try { + if (handle instanceof FileStateHandle) { + org.apache.flink.core.fs.Path p = + ((FileStateHandle) handle) + .getFilePath(); + return exist == p.getFileSystem().exists(p); + } else if (handle + instanceof SegmentFileStateHandle) { + org.apache.flink.core.fs.Path p = + ((SegmentFileStateHandle) handle) + .getFilePath(); + return exist == p.getFileSystem().exists(p); + } + } catch (IOException e) { + log.warn( + "An error occurred when trying to check the file existence.", + e); + return false; } - } - } catch (IOException e) { - disposed.set(false); - } - }); + return true; + }); + result.set(result.get() || singleResult); + if (!result.get()) { + break; + } } - if (!disposed.get()) { + if (!result.get()) { break; } List<OperatorStateHandle> operatorStateHandles = @@ -388,27 +402,30 @@ public class SnapshotFileMergingCompatibilityITCase extends TestLogger { ((FileMergingOperatorStreamStateHandle) handle) .getSharedDirHandle() .getDirectory(); - if (p.getFileSystem().exists(p)) { - disposed.set(false); + if (exist != p.getFileSystem().exists(p)) { + result.set(false); } p = ((FileMergingOperatorStreamStateHandle) handle) .getTaskOwnedDirHandle() .getDirectory(); - if (p.getFileSystem().exists(p)) { - disposed.set(false); + if (exist != p.getFileSystem().exists(p)) { + result.set(false); } } catch (IOException e) { - disposed.set(false); + log.warn( + "An error occurred when trying to check the file existence.", + e); + result.set(false); } } } - if (!disposed.get()) { + if (!result.get()) { break; } } } - return disposed.get(); + return result.get(); } /** Verifying that there is no subdirectory under shared and task-owned directory. */