[FLINK-8703][tests] Port StreamingScalabilityAndLatency to MiniClusterResource
This closes #5668. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2c850d14 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2c850d14 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2c850d14 Branch: refs/heads/release-1.5 Commit: 2c850d14c9c11e421ae832e0ea62004f9ef27426 Parents: 44f7533 Author: zentol <[email protected]> Authored: Tue Feb 27 15:21:50 2018 +0100 Committer: zentol <[email protected]> Committed: Wed Mar 14 20:47:28 2018 +0100 ---------------------------------------------------------------------- .../manual/StreamingScalabilityAndLatency.java | 23 ++++++++++---------- 1 file changed, 12 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2c850d14/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java index efcefeb..a5b01bc 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java +++ b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java @@ -20,14 +20,13 @@ package org.apache.flink.test.manual; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; +import org.apache.flink.test.util.MiniClusterResource; import static org.junit.Assert.fail; @@ -45,22 +44,24 @@ public class StreamingScalabilityAndLatency { final int slotsPerTaskManager = 80; final int parallelism = taskManagers * slotsPerTaskManager; - LocalFlinkMiniCluster cluster = null; + MiniClusterResource cluster = null; try { Configuration config = new Configuration(); - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, taskManagers); config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 80L); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slotsPerTaskManager); config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 20000); config.setInteger("taskmanager.net.server.numThreads", 1); config.setInteger("taskmanager.net.client.numThreads", 1); - cluster = new LocalFlinkMiniCluster(config, false); - cluster.start(); + cluster = new MiniClusterResource( + new MiniClusterResource.MiniClusterResourceConfiguration( + config, + taskManagers, + slotsPerTaskManager)); + cluster.before(); - runPartitioningProgram(cluster.getLeaderRPCPort(), parallelism); + runPartitioningProgram(parallelism); } catch (Exception e) { e.printStackTrace(); @@ -68,13 +69,13 @@ public class StreamingScalabilityAndLatency { } finally { if (cluster != null) { - cluster.stop(); + cluster.after(); } } } - private static void runPartitioningProgram(int jobManagerPort, int parallelism) throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort); + private static void runPartitioningProgram(int parallelism) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(parallelism); env.getConfig().enableObjectReuse();
