[FLINK-8797] Port AbstractOperatorRestoreTestBase to MiniClusterResource

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6732669a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6732669a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6732669a

Branch: refs/heads/master
Commit: 6732669a684de0b230046b8f4291e367e35d9477
Parents: ccb78b0
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Tue Feb 27 13:42:09 2018 +0100
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Sun Mar 11 08:17:21 2018 -0700

----------------------------------------------------------------------
 .../AbstractOperatorRestoreTestBase.java        | 248 ++++++-------------
 1 file changed, 81 insertions(+), 167 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6732669a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
index 9710c20..72f700a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
@@ -19,55 +19,40 @@
 package org.apache.flink.test.state.operator.restore;
 
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.flink.runtime.testingUtils.TestingJobManager;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
-import org.apache.flink.runtime.testingUtils.TestingMemoryArchivist;
-import org.apache.flink.runtime.testingUtils.TestingTaskManager;
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
-import org.junit.AfterClass;
-import org.junit.Assert;
 import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.net.URL;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
-import scala.Option;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 
 /**
  * Abstract class to verify that it is possible to migrate a savepoint across 
upgraded Flink versions and that the
@@ -79,16 +64,21 @@ import scala.concurrent.duration.FiniteDuration;
  */
 public abstract class AbstractOperatorRestoreTestBase extends TestLogger {
 
+       private static final int NUM_TMS = 1;
+       private static final int NUM_SLOTS_PER_TM = 4;
+       private static final Duration TEST_TIMEOUT = Duration.ofSeconds(10000L);
+
        @Rule
        public final TemporaryFolder tmpFolder = new TemporaryFolder();
 
-       private static ActorSystem actorSystem = null;
-       private static HighAvailabilityServices highAvailabilityServices = null;
-       private static ActorGateway jobManager = null;
-       private static ActorGateway archiver = null;
-       private static ActorGateway taskManager = null;
+       @ClassRule
+       public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new 
MiniClusterResource(
+               new MiniClusterResource.MiniClusterResourceConfiguration(
+                       new Configuration(),
+                       NUM_TMS,
+                       NUM_SLOTS_PER_TM),
+               true);
 
-       private static final FiniteDuration timeout = new FiniteDuration(30L, 
TimeUnit.SECONDS);
        private final boolean allowNonRestoredState;
 
        protected AbstractOperatorRestoreTestBase() {
@@ -104,91 +94,21 @@ public abstract class AbstractOperatorRestoreTestBase 
extends TestLogger {
                SavepointSerializers.setFailWhenLegacyStateDetected(false);
        }
 
-       @BeforeClass
-       public static void setupCluster() throws Exception {
-               final Configuration configuration = new Configuration();
-
-               FiniteDuration timeout = new FiniteDuration(30L, 
TimeUnit.SECONDS);
-
-               actorSystem = AkkaUtils.createLocalActorSystem(new 
Configuration());
-
-               highAvailabilityServices = 
HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
-                       configuration,
-                       TestingUtils.defaultExecutor());
-
-               Tuple2<ActorRef, ActorRef> master = 
JobManager.startJobManagerActors(
-                       configuration,
-                       actorSystem,
-                       TestingUtils.defaultExecutor(),
-                       TestingUtils.defaultExecutor(),
-                       highAvailabilityServices,
-                       NoOpMetricRegistry.INSTANCE,
-                       Option.empty(),
-                       Option.apply("jm"),
-                       Option.apply("arch"),
-                       TestingJobManager.class,
-                       TestingMemoryArchivist.class);
-
-               jobManager = LeaderRetrievalUtils.retrieveLeaderGateway(
-                       
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
-                       actorSystem,
-                       timeout);
-
-               archiver = new AkkaActorGateway(master._2(), 
jobManager.leaderSessionID());
-
-               Configuration tmConfig = new Configuration();
-               
tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
-
-               ActorRef taskManagerRef = 
TaskManager.startTaskManagerComponentsAndActor(
-                       tmConfig,
-                       ResourceID.generate(),
-                       actorSystem,
-                       highAvailabilityServices,
-                       NoOpMetricRegistry.INSTANCE,
-                       "localhost",
-                       Option.apply("tm"),
-                       true,
-                       TestingTaskManager.class);
-
-               taskManager = new AkkaActorGateway(taskManagerRef, 
jobManager.leaderSessionID());
-
-               // Wait until connected
-               Object msg = new 
TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
-               Await.ready(taskManager.ask(msg, timeout), timeout);
-       }
-
-       @AfterClass
-       public static void tearDownCluster() throws Exception {
-               if (highAvailabilityServices != null) {
-                       highAvailabilityServices.closeAndCleanupAllData();
-               }
-
-               if (actorSystem != null) {
-                       actorSystem.shutdown();
-               }
-
-               if (archiver != null) {
-                       archiver.actor().tell(PoisonPill.getInstance(), 
ActorRef.noSender());
-               }
-
-               if (jobManager != null) {
-                       jobManager.actor().tell(PoisonPill.getInstance(), 
ActorRef.noSender());
-               }
-
-               if (taskManager != null) {
-                       taskManager.actor().tell(PoisonPill.getInstance(), 
ActorRef.noSender());
-               }
-       }
-
        @Test
        public void testMigrationAndRestore() throws Throwable {
+               ClassLoader classLoader = this.getClass().getClassLoader();
+               ClusterClient<?> clusterClient = 
MINI_CLUSTER_RESOURCE.getClusterClient();
+               clusterClient.setDetached(true);
+               final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
+
                // submit job with old version savepoint and create a migrated 
savepoint in the new version
-               String savepointPath = migrateJob();
+               String savepointPath = migrateJob(classLoader, clusterClient, 
deadline);
                // restore from migrated new version savepoint
-               restoreJob(savepointPath);
+               restoreJob(classLoader, clusterClient, deadline, savepointPath);
        }
 
-       private String migrateJob() throws Throwable {
+       private String migrateJob(ClassLoader classLoader, ClusterClient<?> 
clusterClient, Deadline deadline) throws Throwable {
+
                URL savepointResource = 
AbstractOperatorRestoreTestBase.class.getClassLoader().getResource("operatorstate/"
 + getMigrationSavepointName());
                if (savepointResource == null) {
                        throw new IllegalArgumentException("Savepoint file does 
not exist.");
@@ -196,86 +116,80 @@ public abstract class AbstractOperatorRestoreTestBase 
extends TestLogger {
                JobGraph jobToMigrate = createJobGraph(ExecutionMode.MIGRATE);
                
jobToMigrate.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointResource.getFile()));
 
-               Object msg;
-               Object result;
-
-               // Submit job graph
-               msg = new JobManagerMessages.SubmitJob(jobToMigrate, 
ListeningBehaviour.DETACHED);
-               result = Await.result(jobManager.ask(msg, timeout), timeout);
+               assertNotNull(jobToMigrate.getJobID());
 
-               if (result instanceof JobManagerMessages.JobResultFailure) {
-                       JobManagerMessages.JobResultFailure failure = 
(JobManagerMessages.JobResultFailure) result;
-                       throw new Exception(failure.cause());
-               }
-               Assert.assertSame(JobManagerMessages.JobSubmitSuccess.class, 
result.getClass());
+               clusterClient.submitJob(jobToMigrate, classLoader);
 
-               // Wait for all tasks to be running
-               msg = new 
TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobToMigrate.getJobID());
-               Await.result(jobManager.ask(msg, timeout), timeout);
+               CompletableFuture<JobStatus> jobRunningFuture = 
FutureUtils.retrySuccesfulWithDelay(
+                       () -> 
clusterClient.getJobStatus(jobToMigrate.getJobID()),
+                       Time.milliseconds(50),
+                       deadline,
+                       (jobStatus) -> jobStatus == JobStatus.RUNNING,
+                       TestingUtils.defaultScheduledExecutor());
+               assertEquals(
+                       JobStatus.RUNNING,
+                       jobRunningFuture.get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS));
 
                // Trigger savepoint
                File targetDirectory = tmpFolder.newFolder();
-               msg = new 
JobManagerMessages.CancelJobWithSavepoint(jobToMigrate.getJobID(), 
targetDirectory.getAbsolutePath());
+               String savepointPath = null;
 
                // FLINK-6918: Retry cancel with savepoint message in case that 
StreamTasks were not running
                // TODO: The retry logic should be removed once the StreamTask 
lifecycle has been fixed (see FLINK-4714)
-               boolean retry = true;
-               for (int i = 0; retry && i < 10; i++) {
-                       Future<Object> future = jobManager.ask(msg, timeout);
-                       result = Await.result(future, timeout);
-
-                       if (result instanceof 
JobManagerMessages.CancellationFailure) {
-                               Thread.sleep(50L);
-                       } else {
-                               retry = false;
+               while (deadline.hasTimeLeft() && savepointPath == null) {
+                       try {
+                               savepointPath = 
clusterClient.cancelWithSavepoint(
+                                       jobToMigrate.getJobID(),
+                                       targetDirectory.getAbsolutePath());
+                       } catch (Exception e) {
+                               String exceptionString = 
ExceptionUtils.stringifyException(e);
+                               if 
(!(exceptionString.matches("(.*\n)*.*savepoint for the job .* failed(.*\n)*") 
// legacy
+                                               || 
exceptionString.matches("(.*\n)*.*Not all required tasks are currently 
running(.*\n)*") // flip6
+                                               || 
exceptionString.matches("(.*\n)*.*Checkpoint was declined \\(tasks not 
ready\\)(.*\n)*"))) { // flip6
+                                       throw e;
+                               }
                        }
                }
 
-               if (result instanceof JobManagerMessages.CancellationFailure) {
-                       JobManagerMessages.CancellationFailure failure = 
(JobManagerMessages.CancellationFailure) result;
-                       throw new Exception(failure.cause());
-               }
-
-               String savepointPath = 
((JobManagerMessages.CancellationSuccess) result).savepointPath();
+               assertNotNull("Could not take savepoint.", savepointPath);
 
-               // Wait until canceled
-               msg = new 
TestingJobManagerMessages.NotifyWhenJobStatus(jobToMigrate.getJobID(), 
JobStatus.CANCELED);
-               Await.ready(jobManager.ask(msg, timeout), timeout);
+               CompletableFuture<JobStatus> jobCanceledFuture = 
FutureUtils.retrySuccesfulWithDelay(
+                       () -> 
clusterClient.getJobStatus(jobToMigrate.getJobID()),
+                       Time.milliseconds(50),
+                       deadline,
+                       (jobStatus) -> jobStatus == JobStatus.CANCELED,
+                       TestingUtils.defaultScheduledExecutor());
+               assertEquals(
+                       JobStatus.CANCELED,
+                       jobCanceledFuture.get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS));
 
                return savepointPath;
        }
 
-       private void restoreJob(String savepointPath) throws Exception {
+       private void restoreJob(ClassLoader classLoader, ClusterClient<?> 
clusterClient, Deadline deadline, String savepointPath) throws Exception {
                JobGraph jobToRestore = createJobGraph(ExecutionMode.RESTORE);
                
jobToRestore.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath,
 allowNonRestoredState));
 
-               Object msg;
-               Object result;
-
-               // Submit job graph
-               msg = new JobManagerMessages.SubmitJob(jobToRestore, 
ListeningBehaviour.DETACHED);
-               result = Await.result(jobManager.ask(msg, timeout), timeout);
+               assertNotNull("Job doesn't have a JobID.", 
jobToRestore.getJobID());
 
-               if (result instanceof JobManagerMessages.JobResultFailure) {
-                       JobManagerMessages.JobResultFailure failure = 
(JobManagerMessages.JobResultFailure) result;
-                       throw new Exception(failure.cause());
-               }
-               Assert.assertSame(JobManagerMessages.JobSubmitSuccess.class, 
result.getClass());
-
-               msg = new 
JobManagerMessages.RequestJobStatus(jobToRestore.getJobID());
-               JobStatus status = ((JobManagerMessages.CurrentJobStatus) 
Await.result(jobManager.ask(msg, timeout), timeout)).status();
-               while (!status.isTerminalState()) {
-                       status = ((JobManagerMessages.CurrentJobStatus) 
Await.result(jobManager.ask(msg, timeout), timeout)).status();
-               }
+               clusterClient.submitJob(jobToRestore, classLoader);
 
-               Assert.assertEquals(JobStatus.FINISHED, status);
+               CompletableFuture<JobStatus> jobStatusFuture = 
FutureUtils.retrySuccesfulWithDelay(
+                       () -> 
clusterClient.getJobStatus(jobToRestore.getJobID()),
+                       Time.milliseconds(50),
+                       deadline,
+                       (jobStatus) -> jobStatus == JobStatus.FINISHED,
+                       TestingUtils.defaultScheduledExecutor());
+               assertEquals(
+                       JobStatus.FINISHED,
+                       jobStatusFuture.get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS));
        }
 
        private JobGraph createJobGraph(ExecutionMode mode) {
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);
                env.setRestartStrategy(RestartStrategies.noRestart());
-               env.setStateBackend(new MemoryStateBackend());
+               env.setStateBackend((StateBackend) new MemoryStateBackend());
 
                switch (mode) {
                        case MIGRATE:

Reply via email to