[FLINK-8703][tests] Port KafkaShortRetentionTestBase to MiniClusterResource

This closes #5666.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ba43d6bc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ba43d6bc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ba43d6bc

Branch: refs/heads/release-1.5
Commit: ba43d6bc586bb47c30c2b70f33cadf038f0323ef
Parents: 882fc65
Author: zentol <[email protected]>
Authored: Tue Feb 27 11:11:59 2018 +0100
Committer: zentol <[email protected]>
Committed: Wed Mar 14 20:47:28 2018 +0100

----------------------------------------------------------------------
 .../kafka/KafkaShortRetentionTestBase.java      | 41 ++++++++------------
 1 file changed, 17 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ba43d6bc/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index de72985..15d972f 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -24,15 +24,14 @@ import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.util.InstantiationUtil;
 
 import org.junit.AfterClass;
@@ -68,21 +67,32 @@ public class KafkaShortRetentionTestBase implements 
Serializable {
 
        private static KafkaTestEnvironment kafkaServer;
        private static Properties standardProps;
-       private static LocalFlinkMiniCluster flink;
+
+       @ClassRule
+       public static MiniClusterResource flink = new MiniClusterResource(
+               new MiniClusterResource.MiniClusterResourceConfiguration(
+                       getConfiguration(),
+                       NUM_TMS,
+                       TM_SLOTS));
 
        @ClassRule
        public static TemporaryFolder tempFolder = new TemporaryFolder();
 
        protected static Properties secureProps = new Properties();
 
+       private static Configuration getConfiguration() {
+               Configuration flinkConfig = new Configuration();
+               flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
16L);
+               
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 
s");
+               return flinkConfig;
+       }
+
        @BeforeClass
-       public static void prepare() throws IOException, ClassNotFoundException 
{
+       public static void prepare() throws ClassNotFoundException {
                
LOG.info("-------------------------------------------------------------------------");
                LOG.info("    Starting KafkaShortRetentionTestBase ");
                
LOG.info("-------------------------------------------------------------------------");
 
-               Configuration flinkConfig = new Configuration();
-
                // dynamically load the implementation for the test
                Class<?> clazz = 
Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl");
                kafkaServer = (KafkaTestEnvironment) 
InstantiationUtil.instantiate(clazz);
@@ -101,26 +111,10 @@ public class KafkaShortRetentionTestBase implements 
Serializable {
                
kafkaServer.prepare(kafkaServer.createConfig().setKafkaServerProperties(specificProperties));
 
                standardProps = kafkaServer.getStandardProperties();
-
-               // start also a re-usable Flink mini cluster
-               
flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
-               
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS);
-               flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
16L);
-               
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 
s");
-
-               flink = new LocalFlinkMiniCluster(flinkConfig, false);
-               flink.start();
-
-               TestStreamEnvironment.setAsContext(flink, PARALLELISM);
        }
 
        @AfterClass
        public static void shutDownServices() throws Exception {
-               TestStreamEnvironment.unsetAsContext();
-
-               if (flink != null) {
-                       flink.stop();
-               }
                kafkaServer.shutdown();
 
                secureProps.clear();
@@ -238,8 +232,7 @@ public class KafkaShortRetentionTestBase implements 
Serializable {
 
                kafkaServer.createTestTopic(topic, parallelism, 1);
 
-               final StreamExecutionEnvironment env =
-                               
StreamExecutionEnvironment.createRemoteEnvironment("localhost", 
flink.getLeaderRPCPort());
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(parallelism);
                env.setRestartStrategy(RestartStrategies.noRestart()); // fail 
immediately
                env.getConfig().disableSysoutLogging();

Reply via email to