This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f14546cba1a3edd2dd6f93740f122dbc43ce101f
Author: Zakelly <zakelly....@gmail.com>
AuthorDate: Wed Jul 24 11:38:23 2024 +0800

    [FLINK-35835][test] Make file-merging test tolerate the scenario that 
source does not produce any record
---
 .../SnapshotFileMergingCompatibilityITCase.java    | 77 +++++++++++++---------
 1 file changed, 47 insertions(+), 30 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SnapshotFileMergingCompatibilityITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SnapshotFileMergingCompatibilityITCase.java
index 172a1b62eea..1030c314405 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SnapshotFileMergingCompatibilityITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SnapshotFileMergingCompatibilityITCase.java
@@ -287,7 +287,7 @@ public class SnapshotFileMergingCompatibilityITCase extends 
TestLogger {
         }
     }
 
-    private static void verifyCheckpointExistOrWaitDeleted(
+    private void verifyCheckpointExistOrWaitDeleted(
             String checkpointPath,
             TernaryBoolean exist,
             boolean fileMergingEnabled,
@@ -307,8 +307,7 @@ public class SnapshotFileMergingCompatibilityITCase extends 
TestLogger {
         if (exist.equals(TernaryBoolean.TRUE)) {
             // should exist, just check
             assertThat(fs.exists(checkpointDir)).isTrue();
-            assertThat(fs.listStatus(sharedFile) != null && 
fs.listStatus(sharedFile).length > 0)
-                    .isTrue();
+            assertThat(metadata == null || verifyCheckpointExist(metadata, 
true)).isTrue();
             // Since there is no exclusive state, we should consider 
fileMergingEnabled.
             assertThat(
                             fs.listStatus(taskOwnedFile) != null
@@ -322,7 +321,7 @@ public class SnapshotFileMergingCompatibilityITCase extends 
TestLogger {
                 try {
                     fileExist =
                             (fs.exists(checkpointDir)
-                                    || (metadata != null && 
!verifyCheckpointDisposed(metadata))
+                                    || (metadata != null && 
!verifyCheckpointExist(metadata, false))
                                     || !verifyCheckpointNoDirectory(fs, 
sharedFile, taskOwnedFile));
                 } catch (IOException e) {
                     // Sometimes it may happen that the files are being 
deleted while we list them,
@@ -344,13 +343,14 @@ public class SnapshotFileMergingCompatibilityITCase 
extends TestLogger {
     }
 
     /**
-     * Traverse the checkpoint metadata and verify all the state handle is 
disposed.
+     * Traverse the checkpoint metadata and verify all the state handle exist 
or be disposed.
      *
      * @param metadata the metadata to traverse.
+     * @param exist all corresponding files of the state handle should exist
      * @return true if all corresponding files are deleted.
      */
-    private static boolean verifyCheckpointDisposed(CheckpointMetadata 
metadata) {
-        AtomicBoolean disposed = new AtomicBoolean(true);
+    private boolean verifyCheckpointExist(CheckpointMetadata metadata, boolean 
exist) {
+        AtomicBoolean result = new AtomicBoolean(true);
         for (OperatorState operatorState : metadata.getOperatorStates()) {
             for (OperatorSubtaskState subtaskState : 
operatorState.getStates()) {
                 // Check keyed state handle
@@ -359,24 +359,38 @@ public class SnapshotFileMergingCompatibilityITCase 
extends TestLogger {
                 for (KeyedStateHandle keyedStateHandle : keyedStateHandles) {
                     assertThat(keyedStateHandle)
                             
.isInstanceOf(IncrementalRemoteKeyedStateHandle.class);
-                    ((IncrementalRemoteKeyedStateHandle) keyedStateHandle)
-                            .streamSubHandles()
-                            .forEach(
-                                    handle -> {
-                                        try {
-                                            if (handle instanceof 
FileStateHandle) {
-                                                org.apache.flink.core.fs.Path 
p =
-                                                        ((FileStateHandle) 
handle).getFilePath();
-                                                if 
(p.getFileSystem().exists(p)) {
-                                                    disposed.set(false);
+                    boolean singleResult =
+                            ((IncrementalRemoteKeyedStateHandle) 
keyedStateHandle)
+                                    .streamSubHandles()
+                                    .allMatch(
+                                            handle -> {
+                                                try {
+                                                    if (handle instanceof 
FileStateHandle) {
+                                                        
org.apache.flink.core.fs.Path p =
+                                                                
((FileStateHandle) handle)
+                                                                        
.getFilePath();
+                                                        return exist == 
p.getFileSystem().exists(p);
+                                                    } else if (handle
+                                                            instanceof 
SegmentFileStateHandle) {
+                                                        
org.apache.flink.core.fs.Path p =
+                                                                
((SegmentFileStateHandle) handle)
+                                                                        
.getFilePath();
+                                                        return exist == 
p.getFileSystem().exists(p);
+                                                    }
+                                                } catch (IOException e) {
+                                                    log.warn(
+                                                            "An error occurred 
when trying to check the file existence.",
+                                                            e);
+                                                    return false;
                                                 }
-                                            }
-                                        } catch (IOException e) {
-                                            disposed.set(false);
-                                        }
-                                    });
+                                                return true;
+                                            });
+                    result.set(result.get() || singleResult);
+                    if (!result.get()) {
+                        break;
+                    }
                 }
-                if (!disposed.get()) {
+                if (!result.get()) {
                     break;
                 }
                 List<OperatorStateHandle> operatorStateHandles =
@@ -388,27 +402,30 @@ public class SnapshotFileMergingCompatibilityITCase 
extends TestLogger {
                                     ((FileMergingOperatorStreamStateHandle) 
handle)
                                             .getSharedDirHandle()
                                             .getDirectory();
-                            if (p.getFileSystem().exists(p)) {
-                                disposed.set(false);
+                            if (exist != p.getFileSystem().exists(p)) {
+                                result.set(false);
                             }
                             p =
                                     ((FileMergingOperatorStreamStateHandle) 
handle)
                                             .getTaskOwnedDirHandle()
                                             .getDirectory();
-                            if (p.getFileSystem().exists(p)) {
-                                disposed.set(false);
+                            if (exist != p.getFileSystem().exists(p)) {
+                                result.set(false);
                             }
                         } catch (IOException e) {
-                            disposed.set(false);
+                            log.warn(
+                                    "An error occurred when trying to check 
the file existence.",
+                                    e);
+                            result.set(false);
                         }
                     }
                 }
-                if (!disposed.get()) {
+                if (!result.get()) {
                     break;
                 }
             }
         }
-        return disposed.get();
+        return result.get();
     }
 
     /** Verifying that there is no subdirectory under shared and task-owned 
directory. */

Reply via email to