[FLINK-8703][tests] Port StreamingScalabilityAndLatency to MiniClusterResource

This closes #5668.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2c850d14
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2c850d14
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2c850d14

Branch: refs/heads/release-1.5
Commit: 2c850d14c9c11e421ae832e0ea62004f9ef27426
Parents: 44f7533
Author: zentol <[email protected]>
Authored: Tue Feb 27 15:21:50 2018 +0100
Committer: zentol <[email protected]>
Committed: Wed Mar 14 20:47:28 2018 +0100

----------------------------------------------------------------------
 .../manual/StreamingScalabilityAndLatency.java  | 23 ++++++++++----------
 1 file changed, 12 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2c850d14/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
 
b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
index efcefeb..a5b01bc 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
@@ -20,14 +20,13 @@ package org.apache.flink.test.manual;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.test.util.MiniClusterResource;
 
 import static org.junit.Assert.fail;
 
@@ -45,22 +44,24 @@ public class StreamingScalabilityAndLatency {
                final int slotsPerTaskManager = 80;
                final int parallelism = taskManagers * slotsPerTaskManager;
 
-               LocalFlinkMiniCluster cluster = null;
+               MiniClusterResource cluster = null;
 
                try {
                        Configuration config = new Configuration();
-                       
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, taskManagers);
                        config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
80L);
-                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
slotsPerTaskManager);
                        
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 20000);
 
                        config.setInteger("taskmanager.net.server.numThreads", 
1);
                        config.setInteger("taskmanager.net.client.numThreads", 
1);
 
-                       cluster = new LocalFlinkMiniCluster(config, false);
-                       cluster.start();
+                       cluster = new MiniClusterResource(
+                               new 
MiniClusterResource.MiniClusterResourceConfiguration(
+                                       config,
+                                       taskManagers,
+                                       slotsPerTaskManager));
+                       cluster.before();
 
-                       runPartitioningProgram(cluster.getLeaderRPCPort(), 
parallelism);
+                       runPartitioningProgram(parallelism);
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -68,13 +69,13 @@ public class StreamingScalabilityAndLatency {
                }
                finally {
                        if (cluster != null) {
-                               cluster.stop();
+                               cluster.after();
                        }
                }
        }
 
-       private static void runPartitioningProgram(int jobManagerPort, int 
parallelism) throws Exception {
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort);
+       private static void runPartitioningProgram(int parallelism) throws 
Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(parallelism);
                env.getConfig().enableObjectReuse();
 

Reply via email to