Revert "[FLINK-8703][tests] Port AutoParallelismITCase to flip6" The test does not actually run on Flip6, see FLINK-8813.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/00b73ef7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/00b73ef7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/00b73ef7 Branch: refs/heads/master Commit: 00b73ef79434b4adb302c6b9126a5c24825db121 Parents: 7e43f81 Author: zentol <[email protected]> Authored: Wed Mar 21 20:59:05 2018 +0100 Committer: zentol <[email protected]> Committed: Wed Mar 21 21:01:52 2018 +0100 ---------------------------------------------------------------------- .../flink/test/misc/AutoParallelismITCase.java | 42 +++++++++++++++----- 1 file changed, 32 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/00b73ef7/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java index c25dbf0..9cafee6 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java @@ -22,15 +22,17 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.RichMapPartitionFunction; import org.apache.flink.api.common.io.GenericInputFormat; import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.LocalCollectionOutputFormat; +import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.GenericInputSplit; -import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.test.util.TestEnvironment; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; -import org.junit.ClassRule; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; import java.io.IOException; @@ -50,17 +52,37 @@ public class AutoParallelismITCase extends TestLogger { private static final int SLOTS_PER_TM = 7; private static final int PARALLELISM = NUM_TM * SLOTS_PER_TM; - @ClassRule - public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( - new MiniClusterResource.MiniClusterResourceConfiguration( - new Configuration(), - 2, - 7)); + private static LocalFlinkMiniCluster cluster; + + private static TestEnvironment env; + + @BeforeClass + public static void setupCluster() { + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TM); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, SLOTS_PER_TM); + cluster = new LocalFlinkMiniCluster(config, false); + + cluster.start(); + + env = new TestEnvironment(cluster, NUM_TM * SLOTS_PER_TM, false); + } + + @AfterClass + public static void teardownCluster() { + try { + cluster.stop(); + } + catch (Throwable t) { + System.err.println("Error stopping cluster on shutdown"); + t.printStackTrace(); + fail("ClusterClient shutdown caused an exception: " + t.getMessage()); + } + } @Test public void testProgramWithAutoParallelism() { try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX); env.getConfig().disableSysoutLogging();
