[FLINK-9042][tests] Port ResumeCheckpointManuallyITCase to flip6

This closes #5736.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7c553ba4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7c553ba4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7c553ba4

Branch: refs/heads/master
Commit: 7c553ba45b44145ea09e4d9ccb0bdf64df7ee076
Parents: db366cd
Author: zentol <ches...@apache.org>
Authored: Wed Mar 21 13:31:56 2018 +0100
Committer: zentol <ches...@apache.org>
Committed: Wed Apr 4 08:59:05 2018 +0200

----------------------------------------------------------------------
 .../ResumeCheckpointManuallyITCase.java         | 146 +++++++++++++------
 1 file changed, 104 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7c553ba4/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
index 537f864..add4243 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
@@ -18,25 +18,26 @@
 
 package org.apache.flink.test.checkpointing;
 
-import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
-import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.test.state.ManualWindowSpeedITCase;
+import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.curator.test.TestingServer;
@@ -44,9 +45,17 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import javax.annotation.Nullable;
+
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.assertNotNull;
 
 /**
  * IT case for resuming from checkpoints manually via their external pointer, 
rather than automatic
@@ -240,14 +249,10 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
 
                final Configuration config = new Configuration();
 
-               config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
NUM_TASK_MANAGERS);
-               config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
SLOTS_PER_TASK_MANAGER);
-
                final File savepointDir = temporaryFolder.newFolder();
 
                config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
checkpointDir.toURI().toString());
                config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, 
savepointDir.toURI().toString());
-               config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
checkpointDir.toURI().toString());
 
                if (localRecovery) {
                        config.setString(
@@ -263,56 +268,113 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
                        
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
haDir.toURI().toString());
                }
 
-               TestingCluster cluster = new TestingCluster(config);
-               cluster.start();
+               MiniClusterResource cluster = new MiniClusterResource(
+                       new 
MiniClusterResource.MiniClusterResourceConfiguration(
+                               config,
+                               NUM_TASK_MANAGERS,
+                               SLOTS_PER_TASK_MANAGER),
+                       true);
+
+               cluster.before();
 
-               String externalCheckpoint = null;
+               ClusterClient<?> client = cluster.getClusterClient();
+               client.setDetached(true);
 
                try {
+                       // main test sequence:  start job -> eCP -> restore job 
-> eCP -> restore job
+                       String firstExternalCheckpoint = 
runJobAndGetExternalizedCheckpoint(backend, checkpointDir, null, client);
+                       assertNotNull(firstExternalCheckpoint);
+
+                       String secondExternalCheckpoint = 
runJobAndGetExternalizedCheckpoint(backend, checkpointDir, 
firstExternalCheckpoint, client);
+                       assertNotNull(secondExternalCheckpoint);
 
-                       // main test sequence:  start job -> eCP -> restore job 
-> eCP -> restore job -> eCP
-                       for (int i = 0; i < 3; ++i) {
-                               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+                       String thirdExternalCheckpoint = 
runJobAndGetExternalizedCheckpoint(backend, checkpointDir, 
secondExternalCheckpoint, client);
+                       assertNotNull(thirdExternalCheckpoint);
+               } finally {
+                       cluster.after();
+               }
+       }
 
-                               env.setStateBackend(backend);
-                               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
-                               env.setParallelism(PARALLELISM);
+       private static String runJobAndGetExternalizedCheckpoint(StateBackend 
backend, File checkpointDir, @Nullable String externalCheckpoint, 
ClusterClient<?> client) throws Exception {
+               JobGraph initialJobGraph = getJobGraph(backend, 
externalCheckpoint);
+               NotifyingInfiniteTupleSource.countDownLatch = new 
CountDownLatch(PARALLELISM);
 
-                               // initialize count down latch
-                               NotifyingInfiniteTupleSource.countDownLatch = 
new CountDownLatch(PARALLELISM);
+               client.submitJob(initialJobGraph, 
ResumeCheckpointManuallyITCase.class.getClassLoader());
 
-                               env.addSource(new 
NotifyingInfiniteTupleSource(10_000))
-                                       .keyBy(0)
-                                       .timeWindow(Time.seconds(3))
-                                       .reduce((value1, value2) -> 
Tuple2.of(value1.f0, value1.f1 + value2.f1))
-                                       .filter(value -> 
value.f0.startsWith("Tuple 0"));
+               // wait until all sources have been started
+               NotifyingInfiniteTupleSource.countDownLatch.await();
 
-                               StreamGraph streamGraph = env.getStreamGraph();
-                               streamGraph.setJobName("Test");
+               waitUntilExternalizedCheckpointCreated(checkpointDir, 
initialJobGraph.getJobID());
+               client.cancel(initialJobGraph.getJobID());
+               waitUntilCanceled(initialJobGraph.getJobID(), client);
 
-                               JobGraph jobGraph = streamGraph.getJobGraph();
+               return getExternalizedCheckpointCheckpointPath(checkpointDir, 
initialJobGraph.getJobID());
+       }
+
+       private static String getExternalizedCheckpointCheckpointPath(File 
checkpointDir, JobID jobId) throws IOException {
+               Optional<Path> checkpoint = 
findExternalizedCheckpoint(checkpointDir, jobId);
+               if (!checkpoint.isPresent()) {
+                       throw new AssertionError("No complete checkpoint could 
be found.");
+               } else {
+                       return checkpoint.get().toString();
+               }
+       }
 
-                               // recover from previous iteration?
-                               if (externalCheckpoint != null) {
-                                       
jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(externalCheckpoint));
+       private static void waitUntilExternalizedCheckpointCreated(File 
checkpointDir, JobID jobId) throws InterruptedException, IOException {
+               while (true) {
+                       Thread.sleep(50);
+                       Optional<Path> externalizedCheckpoint = 
findExternalizedCheckpoint(checkpointDir, jobId);
+                       if (externalizedCheckpoint.isPresent()) {
+                               break;
+                       }
+               }
+       }
+
+       private static Optional<Path> findExternalizedCheckpoint(File 
checkpointDir, JobID jobId) throws IOException {
+               return 
Files.list(checkpointDir.toPath().resolve(jobId.toString()))
+                       .filter(path -> 
path.getFileName().toString().startsWith("chk-"))
+                       .filter(path -> {
+                               try {
+                                       return Files.list(path).anyMatch(child 
-> child.getFileName().toString().contains("meta"));
+                               } catch (IOException ignored) {
+                                       return false;
                                }
+                       })
+                       .findAny();
+       }
 
-                               config.addAll(jobGraph.getJobConfiguration());
-                               JobSubmissionResult submissionResult = 
cluster.submitJobDetached(jobGraph);
+       private static void waitUntilCanceled(JobID jobId, ClusterClient<?> 
client) throws ExecutionException, InterruptedException {
+               while (client.getJobStatus(jobId).get() != 
JobStatus.CANCELLING) {
+                       Thread.sleep(50);
+               }
+       }
 
-                               // wait until all sources have been started
-                               
NotifyingInfiniteTupleSource.countDownLatch.await();
+       private static JobGraph getJobGraph(StateBackend backend, @Nullable 
String externalCheckpoint) {
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
-                               externalCheckpoint = cluster.requestCheckpoint(
-                                               submissionResult.getJobID(),
-                                               
CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION);
+               env.enableCheckpointing(500);
+               env.setStateBackend(backend);
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+               env.setParallelism(PARALLELISM);
+               
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
 
-                               cluster.cancelJob(submissionResult.getJobID());
-                       }
-               } finally {
-                       cluster.stop();
-                       cluster.awaitTermination();
+               env.addSource(new NotifyingInfiniteTupleSource(10_000))
+                       .keyBy(0)
+                       .timeWindow(Time.seconds(3))
+                       .reduce((value1, value2) -> Tuple2.of(value1.f0, 
value1.f1 + value2.f1))
+                       .filter(value -> value.f0.startsWith("Tuple 0"));
+
+               StreamGraph streamGraph = env.getStreamGraph();
+               streamGraph.setJobName("Test");
+
+               JobGraph jobGraph = streamGraph.getJobGraph();
+
+               // recover from previous iteration?
+               if (externalCheckpoint != null) {
+                       
jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(externalCheckpoint));
                }
+
+               return jobGraph;
        }
 
        /**

Reply via email to