This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b1cf60  [FLINK-21880][tests] Ignore incomplete checkpoints in 
UnalignedCheckpointRescaleITCase
2b1cf60 is described below

commit 2b1cf60219ba3ae88c3bd7e2537f726caf00e16d
Author: Roman Khachatryan <[email protected]>
AuthorDate: Thu Apr 1 18:01:16 2021 +0200

    [FLINK-21880][tests] Ignore incomplete checkpoints in 
UnalignedCheckpointRescaleITCase
---
 .../checkpointing/UnalignedCheckpointTestBase.java | 43 ++++++++++++++++++----
 1 file changed, 36 insertions(+), 7 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
index a4a2b52..a55433a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
@@ -64,6 +64,7 @@ import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.LogLevelRule;
 import org.apache.flink.util.TestLogger;
 
@@ -86,6 +87,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.nio.file.attribute.BasicFileAttributes;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Comparator;
@@ -98,6 +100,8 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static 
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.CHECKPOINT_DIR_PREFIX;
+import static 
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.METADATA_FILE_NAME;
 import static 
org.apache.flink.shaded.guava18.com.google.common.collect.Iterables.getOnlyElement;
 import static 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -163,13 +167,11 @@ public abstract class UnalignedCheckpointTestBase extends 
TestLogger {
                             .get()
                             
.toJobExecutionResult(getClass().getClassLoader()));
         } catch (Exception e) {
+            if (!ExceptionUtils.findThrowable(e, 
TestException.class).isPresent()) {
+                throw e;
+            }
             if (settings.generateCheckpoint) {
-                return Files.find(
-                                checkpointDir.toPath(),
-                                2,
-                                (file, attr) ->
-                                        attr.isDirectory()
-                                                && 
file.getFileName().toString().startsWith("chk"))
+                return Files.find(checkpointDir.toPath(), 2, 
this::isCompletedCheckpoint)
                         .min(Comparator.comparing(Path::toString))
                         .map(Path::toFile)
                         .orElseThrow(
@@ -185,6 +187,27 @@ public abstract class UnalignedCheckpointTestBase extends 
TestLogger {
         return null;
     }
 
+    private boolean isCompletedCheckpoint(Path path, BasicFileAttributes attr) 
{
+        return attr.isDirectory()
+                && 
path.getFileName().toString().startsWith(CHECKPOINT_DIR_PREFIX)
+                && hasMetadata(path);
+    }
+
+    private boolean hasMetadata(Path file) {
+        try {
+            return Files.find(
+                            file.toAbsolutePath(),
+                            1,
+                            (path, attrs) ->
+                                    
path.getFileName().toString().equals(METADATA_FILE_NAME))
+                    .findAny()
+                    .isPresent();
+        } catch (IOException e) {
+            ExceptionUtils.rethrow(e);
+            return false; // should never happen
+        }
+    }
+
     private StreamGraph getStreamGraph(UnalignedSettings settings, 
Configuration conf) {
         // a dummy environment used to retrieve the DAG, mini cluster will be 
used later
         final StreamExecutionEnvironment setupEnv =
@@ -849,7 +872,7 @@ public abstract class UnalignedCheckpointTestBase extends 
TestLogger {
         }
 
         private void failMapper(String description) throws Exception {
-            throw new Exception(
+            throw new TestException(
                     "Failing "
                             + description
                             + " @ "
@@ -1106,4 +1129,10 @@ public abstract class UnalignedCheckpointTestBase 
extends TestLogger {
         }
         return value;
     }
+
+    private static class TestException extends Exception {
+        public TestException(String s) {
+            super(s);
+        }
+    }
 }

Reply via email to