[FLINK-8703][tests] Port KafkaShortRetentionTestBase to MiniClusterResource
This closes #5666. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ba43d6bc Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ba43d6bc Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ba43d6bc Branch: refs/heads/release-1.5 Commit: ba43d6bc586bb47c30c2b70f33cadf038f0323ef Parents: 882fc65 Author: zentol <[email protected]> Authored: Tue Feb 27 11:11:59 2018 +0100 Committer: zentol <[email protected]> Committed: Wed Mar 14 20:47:28 2018 +0100 ---------------------------------------------------------------------- .../kafka/KafkaShortRetentionTestBase.java | 41 ++++++++------------ 1 file changed, 17 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ba43d6bc/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java index de72985..15d972f 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java @@ -24,15 +24,14 @@ import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; -import org.apache.flink.streaming.util.TestStreamEnvironment; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; +import org.apache.flink.test.util.MiniClusterResource; import org.apache.flink.util.InstantiationUtil; import org.junit.AfterClass; @@ -68,21 +67,32 @@ public class KafkaShortRetentionTestBase implements Serializable { private static KafkaTestEnvironment kafkaServer; private static Properties standardProps; - private static LocalFlinkMiniCluster flink; + + @ClassRule + public static MiniClusterResource flink = new MiniClusterResource( + new MiniClusterResource.MiniClusterResourceConfiguration( + getConfiguration(), + NUM_TMS, + TM_SLOTS)); @ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder(); protected static Properties secureProps = new Properties(); + private static Configuration getConfiguration() { + Configuration flinkConfig = new Configuration(); + flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L); + flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); + return flinkConfig; + } + @BeforeClass - public static void prepare() throws IOException, ClassNotFoundException { + public static void prepare() throws ClassNotFoundException { LOG.info("-------------------------------------------------------------------------"); LOG.info(" Starting KafkaShortRetentionTestBase "); LOG.info("-------------------------------------------------------------------------"); - Configuration flinkConfig = new Configuration(); - // dynamically load the implementation for the test Class<?> clazz = Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl"); kafkaServer = (KafkaTestEnvironment) InstantiationUtil.instantiate(clazz); @@ -101,26 +111,10 @@ public class KafkaShortRetentionTestBase implements Serializable { kafkaServer.prepare(kafkaServer.createConfig().setKafkaServerProperties(specificProperties)); standardProps = kafkaServer.getStandardProperties(); - - // start also a re-usable Flink mini cluster - flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS); - flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS); - flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L); - flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); - - flink = new LocalFlinkMiniCluster(flinkConfig, false); - flink.start(); - - TestStreamEnvironment.setAsContext(flink, PARALLELISM); } @AfterClass public static void shutDownServices() throws Exception { - TestStreamEnvironment.unsetAsContext(); - - if (flink != null) { - flink.stop(); - } kafkaServer.shutdown(); secureProps.clear(); @@ -238,8 +232,7 @@ public class KafkaShortRetentionTestBase implements Serializable { kafkaServer.createTestTopic(topic, parallelism, 1); - final StreamExecutionEnvironment env = - StreamExecutionEnvironment.createRemoteEnvironment("localhost", flink.getLeaderRPCPort()); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(parallelism); env.setRestartStrategy(RestartStrategies.noRestart()); // fail immediately env.getConfig().disableSysoutLogging();
