[FLINK-8703][tests] Port KafkaTestBase to MiniClusterResource

This closes #5669.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/84ad2cd4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/84ad2cd4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/84ad2cd4

Branch: refs/heads/release-1.5
Commit: 84ad2cd4b13db2dbe4a144aa2e3a2802e79f77b9
Parents: 69b8a92
Author: zentol <ches...@apache.org>
Authored: Wed Mar 7 13:39:25 2018 +0100
Committer: zentol <ches...@apache.org>
Committed: Fri Apr 6 15:24:03 2018 +0200

----------------------------------------------------------------------
 .../connectors/kafka/KafkaConsumerTestBase.java | 219 +++++++++++--------
 .../connectors/kafka/KafkaTestBase.java         |  25 +--
 .../testutils/ClusterCommunicationUtils.java    |  56 +++++
 .../testutils/JobManagerCommunicationUtils.java | 147 -------------
 4 files changed, 186 insertions(+), 261 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/84ad2cd4/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 959d6f1..6ed9143 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
@@ -34,6 +35,7 @@ import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.DataInputView;
@@ -42,7 +44,8 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.client.JobCancellationException;
 import org.apache.flink.runtime.client.JobExecutionException;
-import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -54,11 +57,11 @@ import 
org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
-import 
org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidatingMapper;
 import org.apache.flink.streaming.connectors.kafka.testutils.ThrottledMapper;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.Tuple2FlinkPartitioner;
@@ -72,6 +75,9 @@ import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.testutils.junit.RetryOnException;
 import org.apache.flink.testutils.junit.RetryRule;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
 
 import kafka.consumer.Consumer;
 import kafka.consumer.ConsumerConfig;
@@ -106,10 +112,12 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static 
org.apache.flink.streaming.connectors.kafka.testutils.ClusterCommunicationUtils.getRunningJobs;
+import static 
org.apache.flink.streaming.connectors.kafka.testutils.ClusterCommunicationUtils.waitUntilJobIsRunning;
+import static 
org.apache.flink.streaming.connectors.kafka.testutils.ClusterCommunicationUtils.waitUntilNoJobIsRunning;
 import static org.apache.flink.test.util.TestUtils.tryExecute;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -123,6 +131,8 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
        @Rule
        public RetryRule retryRule = new RetryRule();
 
+       private ClusterClient<?> client;
+
        // 
------------------------------------------------------------------------
        //  Common Test Preparation
        // 
------------------------------------------------------------------------
@@ -132,8 +142,9 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
         * the same mini cluster. Otherwise, missing slots may happen.
         */
        @Before
-       public void ensureNoJobIsLingering() throws Exception {
-               
JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
+       public void setClientAndEnsureNoJobIsLingering() throws Exception {
+               client = flink.getClusterClient();
+               waitUntilNoJobIsRunning(client);
        }
 
        // 
------------------------------------------------------------------------
@@ -244,7 +255,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                while (System.nanoTime() < deadline);
 
                // cancel the job & wait for the job to finish
-               
JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+               client.cancel(Iterables.getOnlyElement(getRunningJobs(client)));
                runner.join();
 
                final Throwable t = errorRef.get();
@@ -330,7 +341,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                while (System.nanoTime() < deadline);
 
                // cancel the job & wait for the job to finish
-               
JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+               client.cancel(Iterables.getOnlyElement(getRunningJobs(client)));
                runner.join();
 
                final Throwable t = errorRef.get();
@@ -443,14 +454,18 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                        }).setParallelism(1)
                        .addSink(new DiscardingSink<>());
 
+               JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+               final JobID consumeJobId = jobGraph.getJobID();
+
                final AtomicReference<Throwable> error = new 
AtomicReference<>();
                Thread consumeThread = new Thread(new Runnable() {
                        @Override
                        public void run() {
                                try {
-                                       env.execute(consumeExtraRecordsJobName);
+                                       client.setDetached(false);
+                                       client.submitJob(jobGraph, 
KafkaConsumerTestBase.class.getClassLoader());
                                } catch (Throwable t) {
-                                       if (!(t instanceof 
JobCancellationException)) {
+                                       if (!ExceptionUtils.findThrowable(t, 
JobCancellationException.class).isPresent()) {
                                                error.set(t);
                                        }
                                }
@@ -459,9 +474,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                consumeThread.start();
 
                // wait until the consuming job has started, to be extra safe
-               JobManagerCommunicationUtils.waitUntilJobIsRunning(
-                       flink.getLeaderGateway(timeout),
-                       consumeExtraRecordsJobName);
+               waitUntilJobIsRunning(client);
 
                // setup the extra records writing job
                final StreamExecutionEnvironment env2 = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -500,9 +513,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                }
 
                // cancel the consume job after all extra records are written
-               JobManagerCommunicationUtils.cancelCurrentJob(
-                       flink.getLeaderGateway(timeout),
-                       consumeExtraRecordsJobName);
+               client.cancel(consumeJobId);
                consumeThread.join();
 
                kafkaOffsetHandler.close();
@@ -989,23 +1000,27 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
 
                final AtomicReference<Throwable> jobError = new 
AtomicReference<>();
 
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(parallelism);
+               env.enableCheckpointing(100);
+               env.getConfig().disableSysoutLogging();
+
+               Properties props = new Properties();
+               props.putAll(standardProps);
+               props.putAll(secureProps);
+               FlinkKafkaConsumerBase<String> source = 
kafkaServer.getConsumer(topic, new SimpleStringSchema(), props);
+
+               env.addSource(source).addSink(new DiscardingSink<String>());
+
+               JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+               final JobID jobId = jobGraph.getJobID();
+
                final Runnable jobRunner = new Runnable() {
                        @Override
                        public void run() {
                                try {
-                                       final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-                                       env.setParallelism(parallelism);
-                                       env.enableCheckpointing(100);
-                                       env.getConfig().disableSysoutLogging();
-
-                                       Properties props = new Properties();
-                                       props.putAll(standardProps);
-                                       props.putAll(secureProps);
-                                       FlinkKafkaConsumerBase<String> source = 
kafkaServer.getConsumer(topic, new SimpleStringSchema(), props);
-
-                                       env.addSource(source).addSink(new 
DiscardingSink<String>());
-
-                                       env.execute("Runner for 
CancelingOnFullInputTest");
+                                       client.setDetached(false);
+                                       client.submitJob(jobGraph, 
KafkaConsumerTestBase.class.getClassLoader());
                                }
                                catch (Throwable t) {
                                        jobError.set(t);
@@ -1026,14 +1041,12 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                }
 
                // cancel
-               
JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout), 
"Runner for CancelingOnFullInputTest");
+               client.cancel(jobId);
 
                // wait for the program to be done and validate that we failed 
with the right exception
                runnerThread.join();
 
-               failueCause = jobError.get();
-               assertNotNull("program did not fail properly due to canceling", 
failueCause);
-               assertTrue(failueCause.getMessage().contains("Job was 
cancelled"));
+               assertEquals(JobStatus.CANCELED, 
client.getJobStatus(jobId).get());
 
                if (generator.isAlive()) {
                        generator.shutdown();
@@ -1063,23 +1076,27 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
 
                final AtomicReference<Throwable> error = new 
AtomicReference<>();
 
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(parallelism);
+               env.enableCheckpointing(100);
+               env.getConfig().disableSysoutLogging();
+
+               Properties props = new Properties();
+               props.putAll(standardProps);
+               props.putAll(secureProps);
+               FlinkKafkaConsumerBase<String> source = 
kafkaServer.getConsumer(topic, new SimpleStringSchema(), props);
+
+               env.addSource(source).addSink(new DiscardingSink<String>());
+
+               JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+               final JobID jobId = jobGraph.getJobID();
+
                final Runnable jobRunner = new Runnable() {
                        @Override
                        public void run() {
                                try {
-                                       final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-                                       env.setParallelism(parallelism);
-                                       env.enableCheckpointing(100);
-                                       env.getConfig().disableSysoutLogging();
-
-                                       Properties props = new Properties();
-                                       props.putAll(standardProps);
-                                       props.putAll(secureProps);
-                                       FlinkKafkaConsumerBase<String> source = 
kafkaServer.getConsumer(topic, new SimpleStringSchema(), props);
-
-                                       env.addSource(source).addSink(new 
DiscardingSink<String>());
-
-                                       
env.execute("CancelingOnEmptyInputTest");
+                                       client.setDetached(false);
+                                       client.submitJob(jobGraph, 
KafkaConsumerTestBase.class.getClassLoader());
                                }
                                catch (Throwable t) {
                                        LOG.error("Job Runner failed with 
exception", t);
@@ -1100,14 +1117,12 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                        Assert.fail("Test failed prematurely with: " + 
failueCause.getMessage());
                }
                // cancel
-               
JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+               client.cancel(jobId);
 
                // wait for the program to be done and validate that we failed 
with the right exception
                runnerThread.join();
 
-               failueCause = error.get();
-               assertNotNull("program did not fail properly due to canceling", 
failueCause);
-               assertTrue(failueCause.getMessage().contains("Job was 
cancelled"));
+               assertEquals(JobStatus.CANCELED, 
client.getJobStatus(jobId).get());
 
                deleteTestTopic(topic);
        }
@@ -1558,52 +1573,53 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                createTestTopic(topic, 5, 1);
 
                final Tuple1<Throwable> error = new Tuple1<>(null);
-               Runnable job = new Runnable() {
+
+               // start job writing & reading data.
+               final StreamExecutionEnvironment env1 = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env1.setParallelism(1);
+               
env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+               env1.getConfig().disableSysoutLogging();
+               env1.disableOperatorChaining(); // let the source read 
everything into the network buffers
+
+               TypeInformationSerializationSchema<Tuple2<Integer, Integer>> 
schema = new 
TypeInformationSerializationSchema<>(TypeInfoParser.<Tuple2<Integer, 
Integer>>parse("Tuple2<Integer, Integer>"), env1.getConfig());
+               DataStream<Tuple2<Integer, Integer>> fromKafka = 
env1.addSource(kafkaServer.getConsumer(topic, schema, standardProps));
+               fromKafka.flatMap(new FlatMapFunction<Tuple2<Integer, Integer>, 
Void>() {
                        @Override
-                       public void run() {
-                               try {
-                                       // start job writing & reading data.
-                                       final StreamExecutionEnvironment env1 = 
StreamExecutionEnvironment.getExecutionEnvironment();
-                                       env1.setParallelism(1);
-                                       
env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
-                                       env1.getConfig().disableSysoutLogging();
-                                       env1.disableOperatorChaining(); // let 
the source read everything into the network buffers
-
-                                       Properties props = new Properties();
-                                       props.putAll(standardProps);
-                                       props.putAll(secureProps);
-
-                                       
TypeInformationSerializationSchema<Tuple2<Integer, Integer>> schema = new 
TypeInformationSerializationSchema<>(TypeInfoParser.<Tuple2<Integer, 
Integer>>parse("Tuple2<Integer, Integer>"), env1.getConfig());
-                                       DataStream<Tuple2<Integer, Integer>> 
fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, 
standardProps));
-                                       fromKafka.flatMap(new 
FlatMapFunction<Tuple2<Integer, Integer>, Void>() {
-                                               @Override
-                                               public void 
flatMap(Tuple2<Integer, Integer> value, Collector<Void> out) throws Exception 
{// no op
-                                               }
-                                       });
-
-                                       DataStream<Tuple2<Integer, Integer>> 
fromGen = env1.addSource(new RichSourceFunction<Tuple2<Integer, Integer>>() {
-                                               boolean running = true;
-
-                                               @Override
-                                               public void 
run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
-                                                       int i = 0;
-                                                       while (running) {
-                                                               
ctx.collect(Tuple2.of(i++, getRuntimeContext().getIndexOfThisSubtask()));
-                                                               Thread.sleep(1);
-                                                       }
-                                               }
+                       public void flatMap(Tuple2<Integer, Integer> value, 
Collector<Void> out) throws Exception {// no op
+                       }
+               });
 
-                                               @Override
-                                               public void cancel() {
-                                                       running = false;
-                                               }
-                                       });
+               DataStream<Tuple2<Integer, Integer>> fromGen = 
env1.addSource(new RichSourceFunction<Tuple2<Integer, Integer>>() {
+                       boolean running = true;
+
+                       @Override
+                       public void run(SourceContext<Tuple2<Integer, Integer>> 
ctx) throws Exception {
+                               int i = 0;
+                               while (running) {
+                                       ctx.collect(Tuple2.of(i++, 
getRuntimeContext().getIndexOfThisSubtask()));
+                                       Thread.sleep(1);
+                               }
+                       }
+
+                       @Override
+                       public void cancel() {
+                               running = false;
+                       }
+               });
+
+               kafkaServer.produceIntoKafka(fromGen, topic, new 
KeyedSerializationSchemaWrapper<>(schema), standardProps, null);
 
-                                       kafkaServer.produceIntoKafka(fromGen, 
topic, new KeyedSerializationSchemaWrapper<>(schema), standardProps, null);
+               JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(env1.getStreamGraph());
+               final JobID jobId = jobGraph.getJobID();
 
-                                       env1.execute("Metrics test job");
+               Runnable job = new Runnable() {
+                       @Override
+                       public void run() {
+                               try {
+                                       client.setDetached(false);
+                                       client.submitJob(jobGraph, 
KafkaConsumerTestBase.class.getClassLoader());
                                } catch (Throwable t) {
-                                       if (!(t instanceof 
JobCancellationException)) { // we'll cancel the job
+                                       if (!ExceptionUtils.findThrowable(t, 
JobCancellationException.class).isPresent()) {
                                                LOG.warn("Got exception during 
execution", t);
                                                error.f0 = t;
                                        }
@@ -1653,7 +1669,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                        LOG.info("Found all JMX metrics. Cancelling job.");
                } finally {
                        // cancel
-                       
JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+                       client.cancel(jobId);
                        // wait for the job to finish (it should due to the 
cancel command above)
                        jobThread.join();
                }
@@ -1903,7 +1919,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                        catch (Exception e) {
                                LOG.error("Write attempt failed, trying again", 
e);
                                deleteTestTopic(topicName);
-                               
JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
+                               waitUntilNoJobIsRunning(client);
                                continue;
                        }
 
@@ -1914,7 +1930,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                        // we need to validate the sequence, because kafka's 
producers are not exactly once
                        LOG.info("Validating sequence");
 
-                       
JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
+                       waitUntilNoJobIsRunning(client);
 
                        if (validateSequence(topicName, parallelism, 
deserSchema, numElements)) {
                                // everything is good!
@@ -1996,7 +2012,9 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
 
                // we need to validate the sequence, because kafka's producers 
are not exactly once
                LOG.info("Validating sequence");
-               
JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
+               while (!getRunningJobs(client).isEmpty()){
+                       Thread.sleep(50);
+               }
 
                if (!validateSequence(topicName, parallelism, deserSchema, 
originalNumElements + numElementsToAppend)) {
                        throw new Exception("Could not append a valid sequence 
to Kafka.");
@@ -2040,13 +2058,20 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
 
                final AtomicReference<Throwable> errorRef = new 
AtomicReference<>();
 
+               JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(readEnv.getStreamGraph());
+               final JobID jobId = jobGraph.getJobID();
+
                Thread runner = new Thread() {
                        @Override
                        public void run() {
                                try {
+                                       client.setDetached(false);
+                                       client.submitJob(jobGraph, 
KafkaConsumerTestBase.class.getClassLoader());
                                        tryExecute(readEnv, "sequence 
validation");
                                } catch (Throwable t) {
-                                       errorRef.set(t);
+                                       if (!ExceptionUtils.findThrowable(t, 
SuccessException.class).isPresent()) {
+                                               errorRef.set(t);
+                                       }
                                }
                        }
                };
@@ -2064,7 +2089,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                        // did not finish in time, maybe the producer dropped 
one or more records and
                        // the validation did not reach the exit point
                        success = false;
-                       
JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+                       client.cancel(jobId);
                }
                else {
                        Throwable error = errorRef.get();
@@ -2077,7 +2102,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                        }
                }
 
-               
JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
+               waitUntilNoJobIsRunning(client);
 
                return success;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/84ad2cd4/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index f471cd4..697e075 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -24,9 +24,9 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.metrics.jmx.JMXReporter;
 import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
@@ -75,13 +75,17 @@ public abstract class KafkaTestBase extends TestLogger {
 
        protected static final int TM_SLOTS = 8;
 
-       protected static final int PARALLELISM = NUM_TMS * TM_SLOTS;
-
        protected static String brokerConnectionStrings;
 
        protected static Properties standardProps;
 
-       protected static LocalFlinkMiniCluster flink;
+       @ClassRule
+       public static MiniClusterResource flink = new MiniClusterResource(
+               new MiniClusterResource.MiniClusterResourceConfiguration(
+                       getFlinkConfiguration(),
+                       NUM_TMS,
+                       TM_SLOTS),
+               true);
 
        protected static FiniteDuration timeout = new FiniteDuration(10, 
TimeUnit.SECONDS);
 
@@ -107,8 +111,6 @@ public abstract class KafkaTestBase extends TestLogger {
                
LOG.info("-------------------------------------------------------------------------");
 
                startClusters(false, hideKafkaBehindProxy);
-
-               TestStreamEnvironment.setAsContext(flink, PARALLELISM);
        }
 
        @AfterClass
@@ -131,8 +133,6 @@ public abstract class KafkaTestBase extends TestLogger {
                Configuration flinkConfig = new Configuration();
                flinkConfig.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "5 s");
                flinkConfig.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "1 
s");
-               
flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
-               
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS);
                flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
16L);
                
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 
s");
                flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
JMXReporter.class.getName());
@@ -163,18 +163,9 @@ public abstract class KafkaTestBase extends TestLogger {
                        }
                        secureProps = kafkaServer.getSecureProperties();
                }
-
-               // start also a re-usable Flink mini cluster
-               flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), 
false);
-               flink.start();
        }
 
        protected static void shutdownClusters() throws Exception {
-
-               if (flink != null) {
-                       flink.stop();
-               }
-
                if (secureProps != null) {
                        secureProps.clear();
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/84ad2cd4/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ClusterCommunicationUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ClusterCommunicationUtils.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ClusterCommunicationUtils.java
new file mode 100644
index 0000000..41f9d1e
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ClusterCommunicationUtils.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.runtime.client.JobStatusMessage;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Utilities for communicating with a cluster through a {@link ClusterClient}.
+ */
+public class ClusterCommunicationUtils {
+
+       public static void waitUntilJobIsRunning(ClusterClient<?> client) 
throws Exception {
+               while (getRunningJobs(client).isEmpty()) {
+                       Thread.sleep(50);
+               }
+       }
+
+       public static void waitUntilNoJobIsRunning(ClusterClient<?> client) 
throws Exception {
+               while (!getRunningJobs(client).isEmpty()) {
+                       Thread.sleep(50);
+               }
+       }
+
+       public static List<JobID> getRunningJobs(ClusterClient<?> client) 
throws Exception {
+               Collection<JobStatusMessage> statusMessages = 
client.listJobs().get();
+               return statusMessages.stream()
+                       .filter(status -> 
!status.getJobState().isGloballyTerminalState())
+                       .map(JobStatusMessage::getJobId)
+                       .collect(Collectors.toList());
+       }
+
+       private ClusterCommunicationUtils() {
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/84ad2cd4/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
deleted file mode 100644
index 9bbe1d3..0000000
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-/**
- * Utilities for communicating with a jobmanager through a {@link 
ActorGateway}.
- */
-public class JobManagerCommunicationUtils {
-
-       private static final FiniteDuration askTimeout = new FiniteDuration(30, 
TimeUnit.SECONDS);
-
-       public static void waitUntilNoJobIsRunning(ActorGateway jobManager) 
throws Exception {
-               while (true) {
-                       // find the jobID
-                       Future<Object> listResponse = jobManager.ask(
-                                       
JobManagerMessages.getRequestRunningJobsStatus(), askTimeout);
-
-                       Object result = Await.result(listResponse, askTimeout);
-                       List<JobStatusMessage> jobs = 
((JobManagerMessages.RunningJobsStatus) result).getStatusMessages();
-
-                       if (jobs.isEmpty()) {
-                               return;
-                       }
-
-                       Thread.sleep(50);
-               }
-       }
-
-       public static void waitUntilJobIsRunning(ActorGateway jobManager, 
String name) throws Exception {
-               while (true) {
-                       Future<Object> listResponse = jobManager.ask(
-                               
JobManagerMessages.getRequestRunningJobsStatus(),
-                               askTimeout);
-
-                       List<JobStatusMessage> jobs;
-                       try {
-                               Object result = Await.result(listResponse, 
askTimeout);
-                               jobs = ((JobManagerMessages.RunningJobsStatus) 
result).getStatusMessages();
-                       }
-                       catch (Exception e) {
-                               throw new Exception("Could not wait for job to 
start - failed to retrieve running jobs from the JobManager.", e);
-                       }
-
-                       // see if the running jobs contain the requested job
-                       for (JobStatusMessage job : jobs) {
-                               if (job.getJobName().equals(name)) {
-                                       return;
-                               }
-                       }
-
-                       Thread.sleep(50);
-               }
-       }
-
-       public static void cancelCurrentJob(ActorGateway jobManager) throws 
Exception {
-               cancelCurrentJob(jobManager, null);
-       }
-
-       public static void cancelCurrentJob(ActorGateway jobManager, String 
name) throws Exception {
-               JobStatusMessage status = null;
-
-               for (int i = 0; i < 200; i++) {
-                       // find the jobID
-                       Future<Object> listResponse = jobManager.ask(
-                                       
JobManagerMessages.getRequestRunningJobsStatus(),
-                                       askTimeout);
-
-                       List<JobStatusMessage> jobs;
-                       try {
-                               Object result = Await.result(listResponse, 
askTimeout);
-                               jobs = ((JobManagerMessages.RunningJobsStatus) 
result).getStatusMessages();
-                       }
-                       catch (Exception e) {
-                               throw new Exception("Could not cancel job - 
failed to retrieve running jobs from the JobManager.", e);
-                       }
-
-                       if (jobs.isEmpty()) {
-                               // try again, fall through the loop
-                               Thread.sleep(50);
-                       }
-                       else if (jobs.size() == 1) {
-                               status = jobs.get(0);
-                       }
-                       else if (name != null) {
-                               for (JobStatusMessage msg: jobs) {
-                                       if (msg.getJobName().equals(name)) {
-                                               status = msg;
-                                       }
-                               }
-                               if (status == null) {
-                                       throw new Exception("Could not cancel 
job - no job matched expected name = '" + name + "' in " + jobs);
-                               }
-                       } else {
-                               String jobNames = "";
-                               for (JobStatusMessage jsm: jobs) {
-                                       jobNames += jsm.getJobName() + ", ";
-                               }
-                               throw new Exception("Could not cancel job - 
more than one running job: " + jobNames);
-                       }
-               }
-
-               if (status == null) {
-                       throw new Exception("Could not cancel job - no running 
jobs");
-               }
-               else if (status.getJobState().isGloballyTerminalState()) {
-                       throw new Exception("Could not cancel job - job is not 
running any more");
-               }
-
-               JobID jobId = status.getJobId();
-
-               Future<Object> response = jobManager.ask(new 
JobManagerMessages.CancelJob(jobId), askTimeout);
-               try {
-                       Await.result(response, askTimeout);
-               }
-               catch (Exception e) {
-                       throw new Exception("Sending the 'cancel' message 
failed.", e);
-               }
-       }
-}

Reply via email to