[FLINK-8703][tests] Port NotSoMiniClusterIterations to MiniClusterResource This closes #5667.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/44f7533d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/44f7533d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/44f7533d Branch: refs/heads/release-1.5 Commit: 44f7533db0d0b0791a463d68e28b76fb7622fb88 Parents: ba43d6b Author: zentol <[email protected]> Authored: Tue Feb 27 15:19:50 2018 +0100 Committer: zentol <[email protected]> Committed: Wed Mar 14 20:47:28 2018 +0100 ---------------------------------------------------------------------- .../test/manual/NotSoMiniClusterIterations.java | 23 ++++++++++---------- 1 file changed, 12 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/44f7533d/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java b/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java index 9f6bcbb..abb8673 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java +++ b/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java @@ -24,12 +24,11 @@ import org.apache.flink.api.java.aggregation.Aggregations; import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.operators.DeltaIteration; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.examples.java.graph.ConnectedComponents; import org.apache.flink.examples.java.graph.util.ConnectedComponentsData; -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.test.util.MiniClusterResource; import static org.junit.Assert.fail; @@ -46,23 +45,25 @@ public class NotSoMiniClusterIterations { throw new RuntimeException("This test program needs to run with at least 5GB of heap space."); } - LocalFlinkMiniCluster cluster = null; + MiniClusterResource cluster = null; try { Configuration config = new Configuration(); - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, PARALLELISM); config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 8L); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1); config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1000); config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 8 * 1024); config.setInteger("taskmanager.net.server.numThreads", 1); config.setInteger("taskmanager.net.client.numThreads", 1); - cluster = new LocalFlinkMiniCluster(config, false); - cluster.start(); + cluster = new MiniClusterResource( + new MiniClusterResource.MiniClusterResourceConfiguration( + config, + PARALLELISM, + 1)); + cluster.before(); - runConnectedComponents(cluster.getLeaderRPCPort()); + runConnectedComponents(); } catch (Exception e) { e.printStackTrace(); @@ -70,14 +71,14 @@ public class NotSoMiniClusterIterations { } finally { if (cluster != null) { - cluster.stop(); + cluster.after(); } } } - private static void runConnectedComponents(int jmPort) throws Exception { + private static void runConnectedComponents() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", jmPort); + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(PARALLELISM); env.getConfig().disableSysoutLogging();
