This is an automated email from the ASF dual-hosted git repository.
arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 2b1cf60 [FLINK-21880][tests] Ignore incomplete checkpoints in
UnalignedCheckpointRescaleITCase
2b1cf60 is described below
commit 2b1cf60219ba3ae88c3bd7e2537f726caf00e16d
Author: Roman Khachatryan <[email protected]>
AuthorDate: Thu Apr 1 18:01:16 2021 +0200
[FLINK-21880][tests] Ignore incomplete checkpoints in
UnalignedCheckpointRescaleITCase
---
.../checkpointing/UnalignedCheckpointTestBase.java | 43 ++++++++++++++++++----
1 file changed, 36 insertions(+), 7 deletions(-)
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
index a4a2b52..a55433a 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
@@ -64,6 +64,7 @@ import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;
import org.apache.flink.util.Collector;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.LogLevelRule;
import org.apache.flink.util.TestLogger;
@@ -86,6 +87,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.nio.file.attribute.BasicFileAttributes;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
@@ -98,6 +100,8 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.CHECKPOINT_DIR_PREFIX;
+import static
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.METADATA_FILE_NAME;
import static
org.apache.flink.shaded.guava18.com.google.common.collect.Iterables.getOnlyElement;
import static
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT;
import static org.apache.flink.util.Preconditions.checkState;
@@ -163,13 +167,11 @@ public abstract class UnalignedCheckpointTestBase extends
TestLogger {
.get()
.toJobExecutionResult(getClass().getClassLoader()));
} catch (Exception e) {
+ if (!ExceptionUtils.findThrowable(e,
TestException.class).isPresent()) {
+ throw e;
+ }
if (settings.generateCheckpoint) {
- return Files.find(
- checkpointDir.toPath(),
- 2,
- (file, attr) ->
- attr.isDirectory()
- &&
file.getFileName().toString().startsWith("chk"))
+ return Files.find(checkpointDir.toPath(), 2,
this::isCompletedCheckpoint)
.min(Comparator.comparing(Path::toString))
.map(Path::toFile)
.orElseThrow(
@@ -185,6 +187,27 @@ public abstract class UnalignedCheckpointTestBase extends
TestLogger {
return null;
}
+ private boolean isCompletedCheckpoint(Path path, BasicFileAttributes attr)
{
+ return attr.isDirectory()
+ &&
path.getFileName().toString().startsWith(CHECKPOINT_DIR_PREFIX)
+ && hasMetadata(path);
+ }
+
+ private boolean hasMetadata(Path file) {
+ try {
+ return Files.find(
+ file.toAbsolutePath(),
+ 1,
+ (path, attrs) ->
+
path.getFileName().toString().equals(METADATA_FILE_NAME))
+ .findAny()
+ .isPresent();
+ } catch (IOException e) {
+ ExceptionUtils.rethrow(e);
+ return false; // should never happen
+ }
+ }
+
private StreamGraph getStreamGraph(UnalignedSettings settings,
Configuration conf) {
// a dummy environment used to retrieve the DAG, mini cluster will be
used later
final StreamExecutionEnvironment setupEnv =
@@ -849,7 +872,7 @@ public abstract class UnalignedCheckpointTestBase extends
TestLogger {
}
private void failMapper(String description) throws Exception {
- throw new Exception(
+ throw new TestException(
"Failing "
+ description
+ " @ "
@@ -1106,4 +1129,10 @@ public abstract class UnalignedCheckpointTestBase
extends TestLogger {
}
return value;
}
+
+ private static class TestException extends Exception {
+ public TestException(String s) {
+ super(s);
+ }
+ }
}