[FLINK-8703][tests] Port CancelingTestBase to MiniClusterResource This closes #5664.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/20d7af77 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/20d7af77 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/20d7af77 Branch: refs/heads/release-1.5 Commit: 20d7af77005f831f1cadf3769e0a9a059b9f37d6 Parents: b1a7e4f Author: zentol <ches...@apache.org> Authored: Mon Feb 26 15:36:37 2018 +0100 Committer: zentol <ches...@apache.org> Committed: Wed Apr 4 08:59:18 2018 +0200 ---------------------------------------------------------------------- .../test/cancelling/CancelingTestBase.java | 133 ++++++------------- .../test/cancelling/JoinCancelingITCase.java | 9 +- .../test/cancelling/MapCancelingITCase.java | 7 +- 3 files changed, 45 insertions(+), 104 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/20d7af77/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java index 03ca649..cac16f0 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java @@ -18,9 +18,10 @@ package org.apache.flink.test.cancelling; +import org.apache.flink.api.common.JobSubmissionResult; import org.apache.flink.api.common.Plan; +import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.AkkaOptions; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.TaskManagerOptions; @@ -28,150 +29,100 @@ import org.apache.flink.optimizer.DataStatistics; import org.apache.flink.optimizer.Optimizer; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; -import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.apache.flink.runtime.testutils.JobManagerActorTestUtils; +import org.apache.flink.test.util.MiniClusterResource; import org.apache.flink.util.TestLogger; -import org.apache.hadoop.fs.FileSystem; -import org.junit.After; import org.junit.Assert; -import org.junit.Before; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.junit.ClassRule; import java.util.concurrent.TimeUnit; -import scala.concurrent.Await; -import scala.concurrent.Future; +import scala.concurrent.duration.Deadline; import scala.concurrent.duration.FiniteDuration; -import static org.apache.flink.runtime.messages.JobManagerMessages.CancelJob; -import static org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure; -import static org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess; - /** * Base class for testing job cancellation. */ public abstract class CancelingTestBase extends TestLogger { - private static final Logger LOG = LoggerFactory.getLogger(CancelingTestBase.class); - private static final int MINIMUM_HEAP_SIZE_MB = 192; - /** - * Defines the number of seconds after which an issued cancel request is expected to have taken effect (i.e. the job - * is canceled), starting from the point in time when the cancel request is issued. - */ - private static final int DEFAULT_CANCEL_FINISHED_INTERVAL = 10 * 1000; - - private static final int DEFAULT_TASK_MANAGER_NUM_SLOTS = 1; + protected static final int PARALLELISM = 4; // -------------------------------------------------------------------------------------------- - protected LocalFlinkMiniCluster executor; - - protected int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS; + @ClassRule + public static final MiniClusterResource CLUSTER = new MiniClusterResource( + new MiniClusterResource.MiniClusterResourceConfiguration( + getConfiguration(), + 2, + 4), + true); // -------------------------------------------------------------------------------------------- - private void verifyJvmOptions() { + private static void verifyJvmOptions() { final long heap = Runtime.getRuntime().maxMemory() >> 20; Assert.assertTrue("Insufficient java heap space " + heap + "mb - set JVM option: -Xmx" + MINIMUM_HEAP_SIZE_MB + "m", heap > MINIMUM_HEAP_SIZE_MB - 50); } - @Before - public void startCluster() throws Exception { + private static Configuration getConfiguration() { verifyJvmOptions(); Configuration config = new Configuration(); config.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true); - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4); config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT()); config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096); config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 2048); - this.executor = new LocalFlinkMiniCluster(config, false); - this.executor.start(); - } - - @After - public void stopCluster() throws Exception { - if (this.executor != null) { - this.executor.stop(); - this.executor = null; - FileSystem.closeAll(); - System.gc(); - } + return config; } // -------------------------------------------------------------------------------------------- - public void runAndCancelJob(Plan plan, int msecsTillCanceling) throws Exception { - runAndCancelJob(plan, msecsTillCanceling, DEFAULT_CANCEL_FINISHED_INTERVAL); - } - - public void runAndCancelJob(Plan plan, final int msecsTillCanceling, int maxTimeTillCanceled) throws Exception { - try { - // submit job - final JobGraph jobGraph = getJobGraph(plan); - - executor.submitJobDetached(jobGraph); + protected void runAndCancelJob(Plan plan, final int msecsTillCanceling, int maxTimeTillCanceled) throws Exception { + // submit job + final JobGraph jobGraph = getJobGraph(plan); - // Wait for the job to make some progress and then cancel - JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.RUNNING, - executor.getLeaderGateway(TestingUtils.TESTING_DURATION()), - TestingUtils.TESTING_DURATION()); + ClusterClient<?> client = CLUSTER.getClusterClient(); + client.setDetached(true); - Thread.sleep(msecsTillCanceling); + JobSubmissionResult jobSubmissionResult = client.submitJob(jobGraph, CancelingTestBase.class.getClassLoader()); - FiniteDuration timeout = new FiniteDuration(maxTimeTillCanceled, TimeUnit.MILLISECONDS); + Deadline submissionDeadLine = new FiniteDuration(2, TimeUnit.MINUTES).fromNow(); - ActorGateway jobManager = executor.getLeaderGateway(TestingUtils.TESTING_DURATION()); + JobStatus jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(submissionDeadLine.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + while (jobStatus != JobStatus.RUNNING && submissionDeadLine.hasTimeLeft()) { + Thread.sleep(50); + jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(submissionDeadLine.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + } + if (jobStatus != JobStatus.RUNNING) { + Assert.fail("Job not in state RUNNING."); + } - Future<Object> ask = jobManager.ask(new CancelJob(jobGraph.getJobID()), timeout); + Thread.sleep(msecsTillCanceling); - Object result = Await.result(ask, timeout); + client.cancel(jobSubmissionResult.getJobID()); - if (result instanceof CancellationSuccess) { - // all good - } else if (result instanceof CancellationFailure) { - // Failure - CancellationFailure failure = (CancellationFailure) result; - throw new Exception("Failed to cancel job with ID " + failure.jobID() + ".", - failure.cause()); - } else { - throw new Exception("Unexpected response to cancel request: " + result); - } + Deadline cancelDeadline = new FiniteDuration(maxTimeTillCanceled, TimeUnit.MILLISECONDS).fromNow(); - // Wait for the job to be cancelled - JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.CANCELED, - executor.getLeaderGateway(TestingUtils.TESTING_DURATION()), - TestingUtils.TESTING_DURATION()); + JobStatus jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(cancelDeadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + while (jobStatusAfterCancel != JobStatus.CANCELED && cancelDeadline.hasTimeLeft()) { + Thread.sleep(50); + jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(cancelDeadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); } - catch (Exception e) { - LOG.error("Exception found in runAndCancelJob.", e); - e.printStackTrace(); - Assert.fail(e.getMessage()); + if (jobStatusAfterCancel != JobStatus.CANCELED) { + Assert.fail("Failed to cancel job with ID " + jobSubmissionResult.getJobID() + '.'); } } - private JobGraph getJobGraph(final Plan plan) throws Exception { - final Optimizer pc = new Optimizer(new DataStatistics(), this.executor.configuration()); + private JobGraph getJobGraph(final Plan plan) { + final Optimizer pc = new Optimizer(new DataStatistics(), getConfiguration()); final OptimizedPlan op = pc.compile(plan); final JobGraphGenerator jgg = new JobGraphGenerator(); return jgg.compileJobGraph(op); } - - public void setTaskManagerNumSlots(int taskManagerNumSlots) { - this.taskManagerNumSlots = taskManagerNumSlots; - } - - public int getTaskManagerNumSlots() { - return this.taskManagerNumSlots; - } } http://git-wip-us.apache.org/repos/asf/flink/blob/20d7af77/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java index 5e21129..66919e7 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java @@ -34,15 +34,10 @@ import org.apache.flink.test.util.UniformIntTupleGeneratorInputFormat; * Test job cancellation from within a JoinFunction. */ public class JoinCancelingITCase extends CancelingTestBase { - private static final int parallelism = 4; - - public JoinCancelingITCase() { - setTaskManagerNumSlots(parallelism); - } // --------------- Test Sort Matches that are canceled while still reading / sorting ----------------- private void executeTask(JoinFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> joiner, boolean slow) throws Exception { - executeTask(joiner, slow, parallelism); + executeTask(joiner, slow, PARALLELISM); } private void executeTask(JoinFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> joiner, boolean slow, int parallelism) throws Exception { @@ -90,7 +85,7 @@ public class JoinCancelingITCase extends CancelingTestBase { .with(joiner) .output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>()); - env.setParallelism(parallelism); + env.setParallelism(PARALLELISM); runAndCancelJob(env.createProgramPlan(), msecsTillCanceling, maxTimeTillCanceled); } http://git-wip-us.apache.org/repos/asf/flink/blob/20d7af77/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java index 3a7039f..13edea4 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java @@ -31,11 +31,6 @@ import org.junit.Test; * Test job cancellation from within a MapFunction. */ public class MapCancelingITCase extends CancelingTestBase { - private static final int parallelism = 4; - - public MapCancelingITCase() { - setTaskManagerNumSlots(parallelism); - } @Test public void testMapCancelling() throws Exception { @@ -65,7 +60,7 @@ public class MapCancelingITCase extends CancelingTestBase { .map(mapper) .output(new DiscardingOutputFormat<Integer>()); - env.setParallelism(parallelism); + env.setParallelism(PARALLELISM); runAndCancelJob(env.createProgramPlan(), 5 * 1000, 10 * 1000); }