[FLINK-8703][tests] Port SavepointMigrationTestBase to MiniClusterResource

This closes #5701.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/60e05c05
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/60e05c05
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/60e05c05

Branch: refs/heads/release-1.5
Commit: 60e05c05f3fbf20c1276bc2f1006eb4224eda43f
Parents: 6657aa2
Author: zentol <[email protected]>
Authored: Mon Feb 26 14:54:07 2018 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Tue Mar 20 19:03:22 2018 +0100

----------------------------------------------------------------------
 ...gacyStatefulJobSavepointMigrationITCase.java |  2 +-
 .../utils/SavepointMigrationTestBase.java       | 99 ++++++++------------
 .../StatefulJobSavepointMigrationITCase.java    |  2 +-
 3 files changed, 39 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/60e05c05/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java
index 45a6911..eee1350 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java
@@ -89,7 +89,7 @@ public class LegacyStatefulJobSavepointMigrationITCase 
extends SavepointMigratio
        private final MigrationVersion testMigrateVersion;
        private final String testStateBackend;
 
-       public 
LegacyStatefulJobSavepointMigrationITCase(Tuple2<MigrationVersion, String> 
testMigrateVersionAndBackend) {
+       public 
LegacyStatefulJobSavepointMigrationITCase(Tuple2<MigrationVersion, String> 
testMigrateVersionAndBackend) throws Exception {
                this.testMigrateVersion = testMigrateVersionAndBackend.f0;
                this.testStateBackend = testMigrateVersionAndBackend.f1;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/60e05c05/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
index 2882504..91b5de8 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
@@ -21,25 +21,21 @@ package org.apache.flink.test.checkpointing.utils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.client.program.StandaloneClusterClient;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers;
-import org.apache.flink.runtime.client.JobListeningContext;
-import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.test.util.TestBaseUtils;
 
 import org.apache.commons.io.FileUtils;
-import org.junit.After;
-import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
@@ -49,34 +45,35 @@ import java.io.File;
 import java.net.URI;
 import java.net.URL;
 import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
-import scala.Option;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
 import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 import static junit.framework.Assert.fail;
+import static org.junit.Assert.assertNotEquals;
 
 /**
  * Test savepoint migration.
  */
-public class SavepointMigrationTestBase extends TestBaseUtils {
+public abstract class SavepointMigrationTestBase extends TestBaseUtils {
 
        @BeforeClass
        public static void before() {
                SavepointSerializers.setFailWhenLegacyStateDetected(false);
        }
 
+       @ClassRule
+       public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
+
        @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
+       public final MiniClusterResource miniClusterResource;
 
        private static final Logger LOG = 
LoggerFactory.getLogger(SavepointMigrationTestBase.class);
        private static final Deadline DEADLINE = new FiniteDuration(5, 
TimeUnit.MINUTES).fromNow();
        protected static final int DEFAULT_PARALLELISM = 4;
-       protected LocalFlinkMiniCluster cluster = null;
 
        protected static String getResourceFilename(String filename) {
                ClassLoader cl = 
SavepointMigrationTestBase.class.getClassLoader();
@@ -87,17 +84,25 @@ public class SavepointMigrationTestBase extends 
TestBaseUtils {
                return resource.getFile();
        }
 
-       @Before
-       public void setup() throws Exception {
+       protected SavepointMigrationTestBase() throws Exception {
+               miniClusterResource = new MiniClusterResource(
+                       new 
MiniClusterResource.MiniClusterResourceConfiguration(
+                               getConfiguration(),
+                               1,
+                               DEFAULT_PARALLELISM),
+                       true);
+       }
 
+       private Configuration getConfiguration() throws Exception {
                // Flink configuration
                final Configuration config = new Configuration();
 
                config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
                config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
DEFAULT_PARALLELISM);
 
-               final File checkpointDir = 
tempFolder.newFolder("checkpoints").getAbsoluteFile();
-               final File savepointDir = 
tempFolder.newFolder("savepoints").getAbsoluteFile();
+               UUID id = UUID.randomUUID();
+               final File checkpointDir = TEMP_FOLDER.newFolder("checkpoints_" 
+ id).getAbsoluteFile();
+               final File savepointDir = TEMP_FOLDER.newFolder("savepoints_" + 
id).getAbsoluteFile();
 
                if (!checkpointDir.exists() || !savepointDir.exists()) {
                        throw new Exception("Test setup failed: failed to 
create (temporary) directories.");
@@ -111,12 +116,7 @@ public class SavepointMigrationTestBase extends 
TestBaseUtils {
                config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 
0);
                config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, 
savepointDir.toURI().toString());
 
-               cluster = TestBaseUtils.startCluster(config, false);
-       }
-
-       @After
-       public void teardown() throws Exception {
-               stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
+               return config;
        }
 
        @SafeVarargs
@@ -125,22 +125,20 @@ public class SavepointMigrationTestBase extends 
TestBaseUtils {
                        String savepointPath,
                        Tuple2<String, Integer>... expectedAccumulators) throws 
Exception {
 
-               // Retrieve the job manager
-               ActorGateway jobManager = 
Await.result(cluster.leaderGateway().future(), DEADLINE.timeLeft());
+               ClusterClient<?> client = 
miniClusterResource.getClusterClient();
+               client.setDetached(true);
 
                // Submit the job
                JobGraph jobGraph = env.getStreamGraph().getJobGraph();
 
-               JobSubmissionResult jobSubmissionResult = 
cluster.submitJobDetached(jobGraph);
+               JobSubmissionResult jobSubmissionResult = 
client.submitJob(jobGraph, SavepointMigrationTestBase.class.getClassLoader());
 
                LOG.info("Submitted job {} and waiting...", 
jobSubmissionResult.getJobID());
 
-               StandaloneClusterClient clusterClient = new 
StandaloneClusterClient(cluster.configuration());
-
                boolean done = false;
                while (DEADLINE.hasTimeLeft()) {
                        Thread.sleep(100);
-                       Map<String, Object> accumulators = 
clusterClient.getAccumulators(jobSubmissionResult.getJobID());
+                       Map<String, Object> accumulators = 
client.getAccumulators(jobSubmissionResult.getJobID());
 
                        boolean allDone = true;
                        for (Tuple2<String, Integer> acc : 
expectedAccumulators) {
@@ -166,18 +164,9 @@ public class SavepointMigrationTestBase extends 
TestBaseUtils {
 
                LOG.info("Triggering savepoint.");
 
-               final Future<Object> savepointResultFuture =
-                               jobManager.ask(new 
JobManagerMessages.TriggerSavepoint(jobSubmissionResult.getJobID(), 
Option.<String>empty()), DEADLINE.timeLeft());
+               CompletableFuture<String> savepointPathFuture = 
client.triggerSavepoint(jobSubmissionResult.getJobID(), null);
 
-               Object savepointResult = Await.result(savepointResultFuture, 
DEADLINE.timeLeft());
-
-               if (savepointResult instanceof 
JobManagerMessages.TriggerSavepointFailure) {
-                       fail("Error drawing savepoint: " + 
((JobManagerMessages.TriggerSavepointFailure) savepointResult).cause());
-               }
-
-               // jobmanager will store savepoint in heap, we have to retrieve 
it
-               final String jobmanagerSavepointPath = 
((JobManagerMessages.TriggerSavepointSuccess) savepointResult).savepointPath();
-               LOG.info("Saved savepoint: " + jobmanagerSavepointPath);
+               String jobmanagerSavepointPath = 
savepointPathFuture.get(DEADLINE.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 
                File jobManagerSavepoint = new File(new 
URI(jobmanagerSavepointPath).getPath());
                // savepoints were changed to be directories in Flink 1.3
@@ -194,18 +183,15 @@ public class SavepointMigrationTestBase extends 
TestBaseUtils {
                        String savepointPath,
                        Tuple2<String, Integer>... expectedAccumulators) throws 
Exception {
 
-               // Retrieve the job manager
-               Await.result(cluster.leaderGateway().future(), 
DEADLINE.timeLeft());
+               ClusterClient<?> client = 
miniClusterResource.getClusterClient();
+               client.setDetached(true);
 
                // Submit the job
                JobGraph jobGraph = env.getStreamGraph().getJobGraph();
 
                
jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
 
-               JobSubmissionResult jobSubmissionResult = 
cluster.submitJobDetached(jobGraph);
-
-               StandaloneClusterClient clusterClient = new 
StandaloneClusterClient(cluster.configuration());
-               JobListeningContext jobListeningContext = 
clusterClient.connectToJob(jobSubmissionResult.getJobID());
+               JobSubmissionResult jobSubmissionResult = 
client.submitJob(jobGraph, SavepointMigrationTestBase.class.getClassLoader());
 
                boolean done = false;
                while (DEADLINE.hasTimeLeft()) {
@@ -213,30 +199,19 @@ public class SavepointMigrationTestBase extends 
TestBaseUtils {
                        // try and get a job result, this will fail if the job 
already failed. Use this
                        // to get out of this loop
                        JobID jobId = jobSubmissionResult.getJobID();
-                       FiniteDuration timeout = FiniteDuration.apply(5, 
TimeUnit.SECONDS);
 
                        try {
+                               CompletableFuture<JobStatus> jobStatusFuture = 
client.getJobStatus(jobSubmissionResult.getJobID());
 
-                               Future<Object> future = clusterClient
-                                               .getJobManagerGateway()
-                                               
.ask(JobManagerMessages.getRequestJobStatus(jobSubmissionResult.getJobID()), 
timeout);
+                               JobStatus jobStatus = jobStatusFuture.get(5, 
TimeUnit.SECONDS);
 
-                               Object result = Await.result(future, timeout);
-
-                               if (result instanceof 
JobManagerMessages.CurrentJobStatus) {
-                                       if 
(((JobManagerMessages.CurrentJobStatus) result).status() == JobStatus.FAILED) {
-                                               Object jobResult = Await.result(
-                                                               
jobListeningContext.getJobResultFuture(),
-                                                               
Duration.apply(5, TimeUnit.SECONDS));
-                                               fail("Job failed: " + 
jobResult);
-                                       }
-                               }
+                               assertNotEquals(JobStatus.FAILED, jobStatus);
                        } catch (Exception e) {
                                fail("Could not connect to job: " + e);
                        }
 
                        Thread.sleep(100);
-                       Map<String, Object> accumulators = 
clusterClient.getAccumulators(jobId);
+                       Map<String, Object> accumulators = 
client.getAccumulators(jobId);
 
                        boolean allDone = true;
                        for (Tuple2<String, Integer> acc : 
expectedAccumulators) {

http://git-wip-us.apache.org/repos/asf/flink/blob/60e05c05/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java
index 53a5353..d2de881 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java
@@ -95,7 +95,7 @@ public class StatefulJobSavepointMigrationITCase extends 
SavepointMigrationTestB
        private final MigrationVersion testMigrateVersion;
        private final String testStateBackend;
 
-       public StatefulJobSavepointMigrationITCase(Tuple2<MigrationVersion, 
String> testMigrateVersionAndBackend) {
+       public StatefulJobSavepointMigrationITCase(Tuple2<MigrationVersion, 
String> testMigrateVersionAndBackend) throws Exception {
                this.testMigrateVersion = testMigrateVersionAndBackend.f0;
                this.testStateBackend = testMigrateVersionAndBackend.f1;
        }

Reply via email to