[FLINK-8703][tests] Port NotSoMiniClusterIterations to MiniClusterResource

This closes #5667.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/44f7533d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/44f7533d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/44f7533d

Branch: refs/heads/release-1.5
Commit: 44f7533db0d0b0791a463d68e28b76fb7622fb88
Parents: ba43d6b
Author: zentol <[email protected]>
Authored: Tue Feb 27 15:19:50 2018 +0100
Committer: zentol <[email protected]>
Committed: Wed Mar 14 20:47:28 2018 +0100

----------------------------------------------------------------------
 .../test/manual/NotSoMiniClusterIterations.java | 23 ++++++++++----------
 1 file changed, 12 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/44f7533d/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java
 
b/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java
index 9f6bcbb..abb8673 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java
@@ -24,12 +24,11 @@ import org.apache.flink.api.java.aggregation.Aggregations;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.examples.java.graph.ConnectedComponents;
 import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.test.util.MiniClusterResource;
 
 import static org.junit.Assert.fail;
 
@@ -46,23 +45,25 @@ public class NotSoMiniClusterIterations {
                        throw new RuntimeException("This test program needs to 
run with at least 5GB of heap space.");
                }
 
-               LocalFlinkMiniCluster cluster = null;
+               MiniClusterResource cluster = null;
 
                try {
                        Configuration config = new Configuration();
-                       
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, PARALLELISM);
                        config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
8L);
-                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
                        
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1000);
                        
config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 8 * 1024);
 
                        config.setInteger("taskmanager.net.server.numThreads", 
1);
                        config.setInteger("taskmanager.net.client.numThreads", 
1);
 
-                       cluster = new LocalFlinkMiniCluster(config, false);
-                       cluster.start();
+                       cluster = new MiniClusterResource(
+                               new 
MiniClusterResource.MiniClusterResourceConfiguration(
+                                       config,
+                                       PARALLELISM,
+                                       1));
+                       cluster.before();
 
-                       runConnectedComponents(cluster.getLeaderRPCPort());
+                       runConnectedComponents();
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -70,14 +71,14 @@ public class NotSoMiniClusterIterations {
                }
                finally {
                        if (cluster != null) {
-                               cluster.stop();
+                               cluster.after();
                        }
                }
        }
 
-       private static void runConnectedComponents(int jmPort) throws Exception 
{
+       private static void runConnectedComponents() throws Exception {
 
-               ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment("localhost", jmPort);
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(PARALLELISM);
                env.getConfig().disableSysoutLogging();
 

Reply via email to