Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5664#discussion_r177794734
--- Diff:
flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
---
@@ -18,160 +18,111 @@
package org.apache.flink.test.cancelling;
+import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.Plan;
+import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
+import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.util.TestLogger;
-import org.apache.hadoop.fs.FileSystem;
-import org.junit.After;
import org.junit.Assert;
-import org.junit.Before;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.junit.ClassRule;
import java.util.concurrent.TimeUnit;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
-import static
org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
-import static
org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
-import static
org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
-
/**
* Base class for testing job cancellation.
*/
public abstract class CancelingTestBase extends TestLogger {
- private static final Logger LOG =
LoggerFactory.getLogger(CancelingTestBase.class);
-
private static final int MINIMUM_HEAP_SIZE_MB = 192;
- /**
- * Defines the number of seconds after which an issued cancel request
is expected to have taken effect (i.e. the job
- * is canceled), starting from the point in time when the cancel
request is issued.
- */
- private static final int DEFAULT_CANCEL_FINISHED_INTERVAL = 10 * 1000;
-
- private static final int DEFAULT_TASK_MANAGER_NUM_SLOTS = 1;
+ protected static final int PARALLELISM = 4;
//
--------------------------------------------------------------------------------------------
- protected LocalFlinkMiniCluster executor;
-
- protected int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
+ @ClassRule
+ public static final MiniClusterResource CLUSTER = new
MiniClusterResource(
+ new MiniClusterResource.MiniClusterResourceConfiguration(
+ getConfiguration(),
+ 2,
--- End diff --
Why do we start a mini cluster with 2 TMs and 4 slots per TM? Wouldn't a
single TM be sufficient?
---