[FLINK-8703][tests] Port CancelingTestBase to MiniClusterResource

This closes #5664.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/20d7af77
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/20d7af77
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/20d7af77

Branch: refs/heads/release-1.5
Commit: 20d7af77005f831f1cadf3769e0a9a059b9f37d6
Parents: b1a7e4f
Author: zentol <ches...@apache.org>
Authored: Mon Feb 26 15:36:37 2018 +0100
Committer: zentol <ches...@apache.org>
Committed: Wed Apr 4 08:59:18 2018 +0200

----------------------------------------------------------------------
 .../test/cancelling/CancelingTestBase.java      | 133 ++++++-------------
 .../test/cancelling/JoinCancelingITCase.java    |   9 +-
 .../test/cancelling/MapCancelingITCase.java     |   7 +-
 3 files changed, 45 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/20d7af77/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
index 03ca649..cac16f0 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.test.cancelling;
 
+import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.Plan;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
@@ -28,150 +29,100 @@ import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
+import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.util.TestLogger;
 
-import org.apache.hadoop.fs.FileSystem;
-import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.junit.ClassRule;
 
 import java.util.concurrent.TimeUnit;
 
-import scala.concurrent.Await;
-import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
-import static org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
-import static 
org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
-import static 
org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
-
 /**
  * Base class for testing job cancellation.
  */
 public abstract class CancelingTestBase extends TestLogger {
 
-       private static final Logger LOG = 
LoggerFactory.getLogger(CancelingTestBase.class);
-
        private static final int MINIMUM_HEAP_SIZE_MB = 192;
 
-       /**
-        * Defines the number of seconds after which an issued cancel request 
is expected to have taken effect (i.e. the job
-        * is canceled), starting from the point in time when the cancel 
request is issued.
-        */
-       private static final int DEFAULT_CANCEL_FINISHED_INTERVAL = 10 * 1000;
-
-       private static final int DEFAULT_TASK_MANAGER_NUM_SLOTS = 1;
+       protected static final int PARALLELISM = 4;
 
        // 
--------------------------------------------------------------------------------------------
 
-       protected LocalFlinkMiniCluster executor;
-
-       protected int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
+       @ClassRule
+       public static final MiniClusterResource CLUSTER = new 
MiniClusterResource(
+               new MiniClusterResource.MiniClusterResourceConfiguration(
+                       getConfiguration(),
+                       2,
+                       4),
+               true);
 
        // 
--------------------------------------------------------------------------------------------
 
-       private void verifyJvmOptions() {
+       private static void verifyJvmOptions() {
                final long heap = Runtime.getRuntime().maxMemory() >> 20;
                Assert.assertTrue("Insufficient java heap space " + heap + "mb 
- set JVM option: -Xmx" + MINIMUM_HEAP_SIZE_MB
                                + "m", heap > MINIMUM_HEAP_SIZE_MB - 50);
        }
 
-       @Before
-       public void startCluster() throws Exception {
+       private static Configuration getConfiguration() {
                verifyJvmOptions();
                Configuration config = new Configuration();
                config.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true);
-               config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
-               config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
4);
                config.setString(AkkaOptions.ASK_TIMEOUT, 
TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
                config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096);
                config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 2048);
 
-               this.executor = new LocalFlinkMiniCluster(config, false);
-               this.executor.start();
-       }
-
-       @After
-       public void stopCluster() throws Exception {
-               if (this.executor != null) {
-                       this.executor.stop();
-                       this.executor = null;
-                       FileSystem.closeAll();
-                       System.gc();
-               }
+               return config;
        }
 
        // 
--------------------------------------------------------------------------------------------
 
-       public void runAndCancelJob(Plan plan, int msecsTillCanceling) throws 
Exception {
-               runAndCancelJob(plan, msecsTillCanceling, 
DEFAULT_CANCEL_FINISHED_INTERVAL);
-       }
-
-       public void runAndCancelJob(Plan plan, final int msecsTillCanceling, 
int maxTimeTillCanceled) throws Exception {
-               try {
-                       // submit job
-                       final JobGraph jobGraph = getJobGraph(plan);
-
-                       executor.submitJobDetached(jobGraph);
+       protected void runAndCancelJob(Plan plan, final int msecsTillCanceling, 
int maxTimeTillCanceled) throws Exception {
+               // submit job
+               final JobGraph jobGraph = getJobGraph(plan);
 
-                       // Wait for the job to make some progress and then 
cancel
-                       
JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), 
JobStatus.RUNNING,
-                                       
executor.getLeaderGateway(TestingUtils.TESTING_DURATION()),
-                                       TestingUtils.TESTING_DURATION());
+               ClusterClient<?> client = CLUSTER.getClusterClient();
+               client.setDetached(true);
 
-                       Thread.sleep(msecsTillCanceling);
+               JobSubmissionResult jobSubmissionResult = 
client.submitJob(jobGraph, CancelingTestBase.class.getClassLoader());
 
-                       FiniteDuration timeout = new 
FiniteDuration(maxTimeTillCanceled, TimeUnit.MILLISECONDS);
+               Deadline submissionDeadLine = new FiniteDuration(2, 
TimeUnit.MINUTES).fromNow();
 
-                       ActorGateway jobManager = 
executor.getLeaderGateway(TestingUtils.TESTING_DURATION());
+               JobStatus jobStatus = 
client.getJobStatus(jobSubmissionResult.getJobID()).get(submissionDeadLine.timeLeft().toMillis(),
 TimeUnit.MILLISECONDS);
+               while (jobStatus != JobStatus.RUNNING && 
submissionDeadLine.hasTimeLeft()) {
+                       Thread.sleep(50);
+                       jobStatus = 
client.getJobStatus(jobSubmissionResult.getJobID()).get(submissionDeadLine.timeLeft().toMillis(),
 TimeUnit.MILLISECONDS);
+               }
+               if (jobStatus != JobStatus.RUNNING) {
+                       Assert.fail("Job not in state RUNNING.");
+               }
 
-                       Future<Object> ask = jobManager.ask(new 
CancelJob(jobGraph.getJobID()), timeout);
+               Thread.sleep(msecsTillCanceling);
 
-                       Object result = Await.result(ask, timeout);
+               client.cancel(jobSubmissionResult.getJobID());
 
-                       if (result instanceof CancellationSuccess) {
-                               // all good
-                       } else if (result instanceof CancellationFailure) {
-                               // Failure
-                               CancellationFailure failure = 
(CancellationFailure) result;
-                               throw new Exception("Failed to cancel job with 
ID " + failure.jobID() + ".",
-                                               failure.cause());
-                       } else {
-                               throw new Exception("Unexpected response to 
cancel request: " + result);
-                       }
+               Deadline cancelDeadline = new 
FiniteDuration(maxTimeTillCanceled, TimeUnit.MILLISECONDS).fromNow();
 
-                       // Wait for the job to be cancelled
-                       
JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), 
JobStatus.CANCELED,
-                                       
executor.getLeaderGateway(TestingUtils.TESTING_DURATION()),
-                                       TestingUtils.TESTING_DURATION());
+               JobStatus jobStatusAfterCancel = 
client.getJobStatus(jobSubmissionResult.getJobID()).get(cancelDeadline.timeLeft().toMillis(),
 TimeUnit.MILLISECONDS);
+               while (jobStatusAfterCancel != JobStatus.CANCELED && 
cancelDeadline.hasTimeLeft()) {
+                       Thread.sleep(50);
+                       jobStatusAfterCancel = 
client.getJobStatus(jobSubmissionResult.getJobID()).get(cancelDeadline.timeLeft().toMillis(),
 TimeUnit.MILLISECONDS);
                }
-               catch (Exception e) {
-                       LOG.error("Exception found in runAndCancelJob.", e);
-                       e.printStackTrace();
-                       Assert.fail(e.getMessage());
+               if (jobStatusAfterCancel != JobStatus.CANCELED) {
+                       Assert.fail("Failed to cancel job with ID " + 
jobSubmissionResult.getJobID() + '.');
                }
        }
 
-       private JobGraph getJobGraph(final Plan plan) throws Exception {
-               final Optimizer pc = new Optimizer(new DataStatistics(), 
this.executor.configuration());
+       private JobGraph getJobGraph(final Plan plan) {
+               final Optimizer pc = new Optimizer(new DataStatistics(), 
getConfiguration());
                final OptimizedPlan op = pc.compile(plan);
                final JobGraphGenerator jgg = new JobGraphGenerator();
                return jgg.compileJobGraph(op);
        }
-
-       public void setTaskManagerNumSlots(int taskManagerNumSlots) {
-               this.taskManagerNumSlots = taskManagerNumSlots;
-       }
-
-       public int getTaskManagerNumSlots() {
-               return this.taskManagerNumSlots;
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/20d7af77/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
index 5e21129..66919e7 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
@@ -34,15 +34,10 @@ import 
org.apache.flink.test.util.UniformIntTupleGeneratorInputFormat;
  * Test job cancellation from within a JoinFunction.
  */
 public class JoinCancelingITCase extends CancelingTestBase {
-       private static final int parallelism = 4;
-
-       public JoinCancelingITCase() {
-               setTaskManagerNumSlots(parallelism);
-       }
 
        // --------------- Test Sort Matches that are canceled while still 
reading / sorting -----------------
        private void executeTask(JoinFunction<Tuple2<Integer, Integer>, 
Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> joiner, boolean slow) 
throws Exception {
-               executeTask(joiner, slow, parallelism);
+               executeTask(joiner, slow, PARALLELISM);
        }
 
        private void executeTask(JoinFunction<Tuple2<Integer, Integer>, 
Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> joiner, boolean slow, int 
parallelism) throws Exception {
@@ -90,7 +85,7 @@ public class JoinCancelingITCase extends CancelingTestBase {
                                .with(joiner)
                                .output(new 
DiscardingOutputFormat<Tuple2<Integer, Integer>>());
 
-               env.setParallelism(parallelism);
+               env.setParallelism(PARALLELISM);
 
                runAndCancelJob(env.createProgramPlan(), msecsTillCanceling, 
maxTimeTillCanceled);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/20d7af77/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java
index 3a7039f..13edea4 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java
@@ -31,11 +31,6 @@ import org.junit.Test;
  * Test job cancellation from within a MapFunction.
  */
 public class MapCancelingITCase extends CancelingTestBase {
-       private static final int parallelism = 4;
-
-       public MapCancelingITCase() {
-               setTaskManagerNumSlots(parallelism);
-       }
 
        @Test
        public void testMapCancelling() throws Exception {
@@ -65,7 +60,7 @@ public class MapCancelingITCase extends CancelingTestBase {
                                .map(mapper)
                                .output(new DiscardingOutputFormat<Integer>());
 
-               env.setParallelism(parallelism);
+               env.setParallelism(PARALLELISM);
 
                runAndCancelJob(env.createProgramPlan(), 5 * 1000, 10 * 1000);
        }

Reply via email to