[FLINK-8703][tests] Port SavepointMigrationTestBase to MiniClusterResource This closes #5701.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/60e05c05 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/60e05c05 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/60e05c05 Branch: refs/heads/release-1.5 Commit: 60e05c05f3fbf20c1276bc2f1006eb4224eda43f Parents: 6657aa2 Author: zentol <[email protected]> Authored: Mon Feb 26 14:54:07 2018 +0100 Committer: Till Rohrmann <[email protected]> Committed: Tue Mar 20 19:03:22 2018 +0100 ---------------------------------------------------------------------- ...gacyStatefulJobSavepointMigrationITCase.java | 2 +- .../utils/SavepointMigrationTestBase.java | 99 ++++++++------------ .../StatefulJobSavepointMigrationITCase.java | 2 +- 3 files changed, 39 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/60e05c05/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java index 45a6911..eee1350 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java @@ -89,7 +89,7 @@ public class LegacyStatefulJobSavepointMigrationITCase extends SavepointMigratio private final MigrationVersion testMigrateVersion; private final String testStateBackend; - public LegacyStatefulJobSavepointMigrationITCase(Tuple2<MigrationVersion, String> testMigrateVersionAndBackend) { + public LegacyStatefulJobSavepointMigrationITCase(Tuple2<MigrationVersion, String> testMigrateVersionAndBackend) throws Exception { this.testMigrateVersion = testMigrateVersionAndBackend.f0; this.testStateBackend = testMigrateVersionAndBackend.f1; } http://git-wip-us.apache.org/repos/asf/flink/blob/60e05c05/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java index 2882504..91b5de8 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java @@ -21,25 +21,21 @@ package org.apache.flink.test.checkpointing.utils; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobSubmissionResult; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.client.program.StandaloneClusterClient; +import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers; -import org.apache.flink.runtime.client.JobListeningContext; -import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; -import org.apache.flink.runtime.messages.JobManagerMessages; -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.test.util.MiniClusterResource; import org.apache.flink.test.util.TestBaseUtils; import org.apache.commons.io.FileUtils; -import org.junit.After; -import org.junit.Before; import org.junit.BeforeClass; +import org.junit.ClassRule; import org.junit.Rule; import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; @@ -49,34 +45,35 @@ import java.io.File; import java.net.URI; import java.net.URL; import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import scala.Option; -import scala.concurrent.Await; -import scala.concurrent.Future; import scala.concurrent.duration.Deadline; -import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; import static junit.framework.Assert.fail; +import static org.junit.Assert.assertNotEquals; /** * Test savepoint migration. */ -public class SavepointMigrationTestBase extends TestBaseUtils { +public abstract class SavepointMigrationTestBase extends TestBaseUtils { @BeforeClass public static void before() { SavepointSerializers.setFailWhenLegacyStateDetected(false); } + @ClassRule + public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder(); + @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); + public final MiniClusterResource miniClusterResource; private static final Logger LOG = LoggerFactory.getLogger(SavepointMigrationTestBase.class); private static final Deadline DEADLINE = new FiniteDuration(5, TimeUnit.MINUTES).fromNow(); protected static final int DEFAULT_PARALLELISM = 4; - protected LocalFlinkMiniCluster cluster = null; protected static String getResourceFilename(String filename) { ClassLoader cl = SavepointMigrationTestBase.class.getClassLoader(); @@ -87,17 +84,25 @@ public class SavepointMigrationTestBase extends TestBaseUtils { return resource.getFile(); } - @Before - public void setup() throws Exception { + protected SavepointMigrationTestBase() throws Exception { + miniClusterResource = new MiniClusterResource( + new MiniClusterResource.MiniClusterResourceConfiguration( + getConfiguration(), + 1, + DEFAULT_PARALLELISM), + true); + } + private Configuration getConfiguration() throws Exception { // Flink configuration final Configuration config = new Configuration(); config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, DEFAULT_PARALLELISM); - final File checkpointDir = tempFolder.newFolder("checkpoints").getAbsoluteFile(); - final File savepointDir = tempFolder.newFolder("savepoints").getAbsoluteFile(); + UUID id = UUID.randomUUID(); + final File checkpointDir = TEMP_FOLDER.newFolder("checkpoints_" + id).getAbsoluteFile(); + final File savepointDir = TEMP_FOLDER.newFolder("savepoints_" + id).getAbsoluteFile(); if (!checkpointDir.exists() || !savepointDir.exists()) { throw new Exception("Test setup failed: failed to create (temporary) directories."); @@ -111,12 +116,7 @@ public class SavepointMigrationTestBase extends TestBaseUtils { config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 0); config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString()); - cluster = TestBaseUtils.startCluster(config, false); - } - - @After - public void teardown() throws Exception { - stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT); + return config; } @SafeVarargs @@ -125,22 +125,20 @@ public class SavepointMigrationTestBase extends TestBaseUtils { String savepointPath, Tuple2<String, Integer>... expectedAccumulators) throws Exception { - // Retrieve the job manager - ActorGateway jobManager = Await.result(cluster.leaderGateway().future(), DEADLINE.timeLeft()); + ClusterClient<?> client = miniClusterResource.getClusterClient(); + client.setDetached(true); // Submit the job JobGraph jobGraph = env.getStreamGraph().getJobGraph(); - JobSubmissionResult jobSubmissionResult = cluster.submitJobDetached(jobGraph); + JobSubmissionResult jobSubmissionResult = client.submitJob(jobGraph, SavepointMigrationTestBase.class.getClassLoader()); LOG.info("Submitted job {} and waiting...", jobSubmissionResult.getJobID()); - StandaloneClusterClient clusterClient = new StandaloneClusterClient(cluster.configuration()); - boolean done = false; while (DEADLINE.hasTimeLeft()) { Thread.sleep(100); - Map<String, Object> accumulators = clusterClient.getAccumulators(jobSubmissionResult.getJobID()); + Map<String, Object> accumulators = client.getAccumulators(jobSubmissionResult.getJobID()); boolean allDone = true; for (Tuple2<String, Integer> acc : expectedAccumulators) { @@ -166,18 +164,9 @@ public class SavepointMigrationTestBase extends TestBaseUtils { LOG.info("Triggering savepoint."); - final Future<Object> savepointResultFuture = - jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobSubmissionResult.getJobID(), Option.<String>empty()), DEADLINE.timeLeft()); + CompletableFuture<String> savepointPathFuture = client.triggerSavepoint(jobSubmissionResult.getJobID(), null); - Object savepointResult = Await.result(savepointResultFuture, DEADLINE.timeLeft()); - - if (savepointResult instanceof JobManagerMessages.TriggerSavepointFailure) { - fail("Error drawing savepoint: " + ((JobManagerMessages.TriggerSavepointFailure) savepointResult).cause()); - } - - // jobmanager will store savepoint in heap, we have to retrieve it - final String jobmanagerSavepointPath = ((JobManagerMessages.TriggerSavepointSuccess) savepointResult).savepointPath(); - LOG.info("Saved savepoint: " + jobmanagerSavepointPath); + String jobmanagerSavepointPath = savepointPathFuture.get(DEADLINE.timeLeft().toMillis(), TimeUnit.MILLISECONDS); File jobManagerSavepoint = new File(new URI(jobmanagerSavepointPath).getPath()); // savepoints were changed to be directories in Flink 1.3 @@ -194,18 +183,15 @@ public class SavepointMigrationTestBase extends TestBaseUtils { String savepointPath, Tuple2<String, Integer>... expectedAccumulators) throws Exception { - // Retrieve the job manager - Await.result(cluster.leaderGateway().future(), DEADLINE.timeLeft()); + ClusterClient<?> client = miniClusterResource.getClusterClient(); + client.setDetached(true); // Submit the job JobGraph jobGraph = env.getStreamGraph().getJobGraph(); jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath)); - JobSubmissionResult jobSubmissionResult = cluster.submitJobDetached(jobGraph); - - StandaloneClusterClient clusterClient = new StandaloneClusterClient(cluster.configuration()); - JobListeningContext jobListeningContext = clusterClient.connectToJob(jobSubmissionResult.getJobID()); + JobSubmissionResult jobSubmissionResult = client.submitJob(jobGraph, SavepointMigrationTestBase.class.getClassLoader()); boolean done = false; while (DEADLINE.hasTimeLeft()) { @@ -213,30 +199,19 @@ public class SavepointMigrationTestBase extends TestBaseUtils { // try and get a job result, this will fail if the job already failed. Use this // to get out of this loop JobID jobId = jobSubmissionResult.getJobID(); - FiniteDuration timeout = FiniteDuration.apply(5, TimeUnit.SECONDS); try { + CompletableFuture<JobStatus> jobStatusFuture = client.getJobStatus(jobSubmissionResult.getJobID()); - Future<Object> future = clusterClient - .getJobManagerGateway() - .ask(JobManagerMessages.getRequestJobStatus(jobSubmissionResult.getJobID()), timeout); + JobStatus jobStatus = jobStatusFuture.get(5, TimeUnit.SECONDS); - Object result = Await.result(future, timeout); - - if (result instanceof JobManagerMessages.CurrentJobStatus) { - if (((JobManagerMessages.CurrentJobStatus) result).status() == JobStatus.FAILED) { - Object jobResult = Await.result( - jobListeningContext.getJobResultFuture(), - Duration.apply(5, TimeUnit.SECONDS)); - fail("Job failed: " + jobResult); - } - } + assertNotEquals(JobStatus.FAILED, jobStatus); } catch (Exception e) { fail("Could not connect to job: " + e); } Thread.sleep(100); - Map<String, Object> accumulators = clusterClient.getAccumulators(jobId); + Map<String, Object> accumulators = client.getAccumulators(jobId); boolean allDone = true; for (Tuple2<String, Integer> acc : expectedAccumulators) { http://git-wip-us.apache.org/repos/asf/flink/blob/60e05c05/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java index 53a5353..d2de881 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java @@ -95,7 +95,7 @@ public class StatefulJobSavepointMigrationITCase extends SavepointMigrationTestB private final MigrationVersion testMigrateVersion; private final String testStateBackend; - public StatefulJobSavepointMigrationITCase(Tuple2<MigrationVersion, String> testMigrateVersionAndBackend) { + public StatefulJobSavepointMigrationITCase(Tuple2<MigrationVersion, String> testMigrateVersionAndBackend) throws Exception { this.testMigrateVersion = testMigrateVersionAndBackend.f0; this.testStateBackend = testMigrateVersionAndBackend.f1; }
