[FLINK-8703][tests] Migrate tests to MiniClusterResource (batch #2)

This closes #5542.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3e056b34
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3e056b34
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3e056b34

Branch: refs/heads/master
Commit: 3e056b34f7be817e0d0eee612b6ae44891e33501
Parents: 0ae7364
Author: zentol <[email protected]>
Authored: Tue Feb 20 18:02:32 2018 +0100
Committer: zentol <[email protected]>
Committed: Mon Feb 26 20:42:00 2018 +0100

----------------------------------------------------------------------
 .../apache/flink/ml/util/FlinkTestBase.scala    |  25 ++--
 ...ScalaStreamingMultipleProgramsTestBase.scala |  29 ++---
 ...tractEventTimeWindowCheckpointingITCase.java |  86 ++++++-------
 .../AbstractLocalRecoveryITCase.java            | 100 ++++++++++++++++
 ...ckendEventTimeWindowCheckpointingITCase.java |   5 +-
 ...ckendEventTimeWindowCheckpointingITCase.java |   6 +-
 ...ckendEventTimeWindowCheckpointingITCase.java |   6 +-
 ...ckendEventTimeWindowCheckpointingITCase.java |   5 +-
 ...ckendEventTimeWindowCheckpointingITCase.java |   5 +-
 .../checkpointing/LocalRecoveryHeapITCase.java  |  33 +++++
 .../test/checkpointing/LocalRecoveryITCase.java | 120 -------------------
 .../LocalRecoveryRocksDBFullITCase.java         |  33 +++++
 .../LocalRecoveryRocksDBIncrementalITCase.java  |  33 +++++
 ...ckendEventTimeWindowCheckpointingITCase.java |   5 +-
 ...ckendEventTimeWindowCheckpointingITCase.java |   5 +-
 .../operators/CustomDistributionITCase.java     |  42 ++-----
 .../flink/test/runtime/IPv6HostnamesITCase.java |  47 ++++----
 17 files changed, 311 insertions(+), 274 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3e056b34/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala
 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala
index c27a2b5..2165152 100644
--- 
a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala
+++ 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala
@@ -18,8 +18,9 @@
 
 package org.apache.flink.ml.util
 
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
-import org.apache.flink.test.util.{TestBaseUtils, TestEnvironment}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.test.util.MiniClusterResource
+import 
org.apache.flink.test.util.MiniClusterResource.MiniClusterResourceConfiguration
 import org.scalatest.{BeforeAndAfter, Suite}
 
 /** Mixin to start and stop a LocalFlinkMiniCluster automatically for Scala 
based tests.
@@ -51,27 +52,21 @@ import org.scalatest.{BeforeAndAfter, Suite}
 trait FlinkTestBase extends BeforeAndAfter {
   that: Suite =>
 
-  var cluster: Option[LocalFlinkMiniCluster] = None
+  var cluster: Option[MiniClusterResource] = None
   val parallelism = 4
 
   before {
-    val cl = TestBaseUtils.startCluster(
-      1,
-      parallelism,
-      false,
-      false,
-      true)
-
-    val clusterEnvironment = new TestEnvironment(cl, parallelism, false)
-    clusterEnvironment.setAsContext()
+    val cl = new MiniClusterResource(
+      new MiniClusterResourceConfiguration(new Configuration(), 1, parallelism)
+    )
+    
+    cl.before()
 
     cluster = Some(cl)
   }
 
   after {
-    cluster.foreach(c => TestBaseUtils.stopCluster(c, 
TestBaseUtils.DEFAULT_TIMEOUT))
-
-    TestEnvironment.unsetAsContext()
+    cluster.foreach(c => c.after())
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3e056b34/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
index d9f727c..e0c5b45 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
@@ -18,12 +18,10 @@
 
 package org.apache.flink.streaming.api.scala
 
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
-import org.apache.flink.streaming.util.TestStreamEnvironment
-import org.apache.flink.test.util.TestBaseUtils
-
+import org.apache.flink.configuration.Configuration
+import 
org.apache.flink.test.util.MiniClusterResource.MiniClusterResourceConfiguration
+import org.apache.flink.test.util.{MiniClusterResource, TestBaseUtils}
 import org.junit.{After, Before}
-
 import org.scalatest.junit.JUnitSuiteLike
 
 trait ScalaStreamingMultipleProgramsTestBase
@@ -31,28 +29,21 @@ trait ScalaStreamingMultipleProgramsTestBase
   with  JUnitSuiteLike {
 
   val parallelism = 4
-  var cluster: Option[LocalFlinkMiniCluster] = None
+  var cluster: Option[MiniClusterResource] = None
 
   @Before
   def beforeAll(): Unit = {
-    val cluster = Some(
-      TestBaseUtils.startCluster(
-        1,
-        parallelism,
-        false,
-        false,
-        true
-      )
+    val cl = new MiniClusterResource(
+      new MiniClusterResourceConfiguration(new Configuration(), 1, parallelism)
     )
 
-    TestStreamEnvironment.setAsContext(cluster.get, parallelism)
+    cl.before()
+
+    cluster = Some(cl)
   }
 
   @After
   def afterAll(): Unit = {
-    TestStreamEnvironment.unsetAsContext()
-    cluster.foreach {
-      TestBaseUtils.stopCluster(_, TestBaseUtils.DEFAULT_TIMEOUT)
-    }
+    cluster.foreach { c => c.after() }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3e056b34/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
index 557c097..f37ba0d 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
@@ -26,35 +26,32 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.curator.test.TestingServer;
 import org.junit.After;
-import org.junit.Before;
+import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -65,9 +62,6 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static 
org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK;
@@ -91,31 +85,42 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
        private static final int MAX_MEM_STATE_SIZE = 20 * 1024 * 1024;
        private static final int PARALLELISM = 4;
 
-       private static LocalFlinkMiniCluster cluster;
+       private TestingServer zkServer;
 
-       private static TestStreamEnvironment env;
-
-       private static TestingServer zkServer;
-
-       @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
+       @ClassRule
+       public static TemporaryFolder tempFolder = new TemporaryFolder();
 
        @Rule
        public TestName name = new TestName();
 
-       private StateBackendEnum stateBackendEnum;
-       protected AbstractStateBackend stateBackend;
+       private AbstractStateBackend stateBackend;
 
-       AbstractEventTimeWindowCheckpointingITCase(StateBackendEnum 
stateBackendEnum) {
-               this.stateBackendEnum = stateBackendEnum;
-       }
+       @Rule
+       public final MiniClusterResource miniClusterResource = 
getMiniClusterResource();
 
        enum StateBackendEnum {
                MEM, FILE, ROCKSDB_FULLY_ASYNC, ROCKSDB_INCREMENTAL, 
ROCKSDB_INCREMENTAL_ZK, MEM_ASYNC, FILE_ASYNC
        }
 
-       @Before
-       public void startTestCluster() throws Exception {
+       protected abstract StateBackendEnum getStateBackend();
+
+       protected final MiniClusterResource getMiniClusterResource() {
+               return new MiniClusterResource(
+                       new 
MiniClusterResource.MiniClusterResourceConfiguration(
+                               getConfigurationSafe(),
+                               2,
+                               PARALLELISM / 2));
+       }
+
+       private Configuration getConfigurationSafe() {
+               try {
+                       return getConfiguration();
+               } catch (Exception e) {
+                       throw new AssertionError("Could not initialize test.", 
e);
+               }
+       }
+
+       private Configuration getConfiguration() throws Exception {
 
                // print a message when starting a test method to avoid Travis' 
<tt>"Maven produced no
                // output for xxx seconds."</tt> messages
@@ -123,6 +128,7 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
                        "Starting " + getClass().getCanonicalName() + "#" + 
name.getMethodName() + ".");
 
                // Testing HA Scenario / ZKCompletedCheckpointStore with 
incremental checkpoints
+               StateBackendEnum stateBackendEnum = getStateBackend();
                if (ROCKSDB_INCREMENTAL_ZK.equals(stateBackendEnum)) {
                        zkServer = new TestingServer();
                        zkServer.start();
@@ -130,23 +136,6 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
 
                Configuration config = createClusterConfig();
 
-               // purposefully delay in the executor to tease out races
-               final ScheduledExecutorService executor = 
Executors.newScheduledThreadPool(10);
-               HighAvailabilityServices haServices = 
HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
-                       config,
-                       new Executor() {
-                               @Override
-                               public void execute(Runnable command) {
-                                       executor.schedule(command, 500, 
MILLISECONDS);
-                               }
-                       });
-
-               cluster = new LocalFlinkMiniCluster(config, haServices, false);
-               cluster.start();
-
-               env = new TestStreamEnvironment(cluster, PARALLELISM);
-               env.getConfig().setUseSnapshotCompression(true);
-
                switch (stateBackendEnum) {
                        case MEM:
                                this.stateBackend = new 
MemoryStateBackend(MAX_MEM_STATE_SIZE, false);
@@ -190,6 +179,7 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
                        default:
                                throw new IllegalStateException("No backend 
selected.");
                }
+               return config;
        }
 
        protected Configuration createClusterConfig() throws IOException {
@@ -198,8 +188,6 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
                final File haDir = temporaryFolder.newFolder();
 
                Configuration config = new Configuration();
-               config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
-               config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
PARALLELISM / 2);
                config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 48L);
                // the default network buffers size (10% of heap max =~ 150MB) 
seems to much for this test case
                config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 
80L << 20); // 80 MB
@@ -215,11 +203,6 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
 
        @After
        public void stopTestCluster() throws IOException {
-               if (cluster != null) {
-                       cluster.stop();
-                       cluster = null;
-               }
-
                if (zkServer != null) {
                        zkServer.stop();
                        zkServer = null;
@@ -241,12 +224,14 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
                FailingSource.reset();
 
                try {
+                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                        env.setParallelism(PARALLELISM);
                        
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
                        env.enableCheckpointing(100);
                        
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
                        env.getConfig().disableSysoutLogging();
                        env.setStateBackend(this.stateBackend);
+                       env.getConfig().setUseSnapshotCompression(true);
 
                        env
                                        .addSource(new FailingSource(numKeys, 
numElementsPerKey, numElementsPerKey / 3))
@@ -310,6 +295,7 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
                FailingSource.reset();
 
                try {
+                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                        env.setParallelism(PARALLELISM);
                        env.setMaxParallelism(maxParallelism);
                        
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
@@ -317,6 +303,7 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
                        
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
                        env.getConfig().disableSysoutLogging();
                        env.setStateBackend(this.stateBackend);
+                       env.getConfig().setUseSnapshotCompression(true);
 
                        env
                                        .addSource(new FailingSource(numKeys, 
numElementsPerKey, numElementsPerKey / 3))
@@ -376,6 +363,7 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
                FailingSource.reset();
 
                try {
+                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                        env.setMaxParallelism(2 * PARALLELISM);
                        env.setParallelism(PARALLELISM);
                        
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
@@ -438,12 +426,14 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
                FailingSource.reset();
 
                try {
+                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                        env.setParallelism(PARALLELISM);
                        
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
                        env.enableCheckpointing(100);
                        
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
                        env.getConfig().disableSysoutLogging();
                        env.setStateBackend(this.stateBackend);
+                       env.getConfig().setUseSnapshotCompression(true);
 
                        env
                                        .addSource(new FailingSource(numKeys, 
numElementsPerKey, numElementsPerKey / 3))
@@ -507,12 +497,14 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
                FailingSource.reset();
 
                try {
+                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                        env.setParallelism(PARALLELISM);
                        
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
                        env.enableCheckpointing(100);
                        
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
                        env.getConfig().disableSysoutLogging();
                        env.setStateBackend(this.stateBackend);
+                       env.getConfig().setUseSnapshotCompression(true);
 
                        env
                                        .addSource(new FailingSource(numKeys, 
numElementsPerKey, numElementsPerKey / 3))

http://git-wip-us.apache.org/repos/asf/flink/blob/3e056b34/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java
new file mode 100644
index 0000000..a02e902
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+
+import static 
org.apache.flink.runtime.state.LocalRecoveryConfig.LocalRecoveryMode;
+import static 
org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum;
+
+/**
+ * This test delegates to instances of {@link 
AbstractEventTimeWindowCheckpointingITCase} that have been reconfigured
+ * to use local recovery.
+ *
+ * <p>TODO: This class must be refactored to properly extend {@link 
AbstractEventTimeWindowCheckpointingITCase}.
+ */
+public abstract class AbstractLocalRecoveryITCase extends TestLogger {
+
+       private final StateBackendEnum backendEnum;
+       private final LocalRecoveryMode recoveryMode;
+
+       @Rule
+       public TestName testName = new TestName();
+
+       AbstractLocalRecoveryITCase(StateBackendEnum backendEnum, 
LocalRecoveryMode recoveryMode) {
+               this.backendEnum = backendEnum;
+               this.recoveryMode = recoveryMode;
+       }
+
+       @Test
+       public final void executeTest() throws Exception {
+               AbstractEventTimeWindowCheckpointingITCase.tempFolder.create();
+               AbstractEventTimeWindowCheckpointingITCase windowChkITCase =
+                       new AbstractEventTimeWindowCheckpointingITCase() {
+                               @Override
+                               protected StateBackendEnum getStateBackend() {
+                                       return backendEnum;
+                               }
+
+                               @Override
+                               protected Configuration createClusterConfig() 
throws IOException {
+                                       Configuration config = 
super.createClusterConfig();
+
+                                       config.setString(
+                                               
CheckpointingOptions.LOCAL_RECOVERY,
+                                               recoveryMode.toString());
+
+                                       return config;
+                               }
+                       };
+
+               executeTest(windowChkITCase);
+       }
+
+       private void executeTest(AbstractEventTimeWindowCheckpointingITCase 
delegate) throws Exception {
+               delegate.name = testName;
+               try {
+                       delegate.miniClusterResource.before();
+                       try {
+                               delegate.testTumblingTimeWindow();
+                               delegate.miniClusterResource.after();
+                       } catch (Exception e) {
+                               delegate.miniClusterResource.after();
+                       }
+
+                       delegate.miniClusterResource.before();
+                       try {
+                               delegate.testSlidingTimeWindow();
+                               delegate.miniClusterResource.after();
+                       } catch (Exception e) {
+                               delegate.miniClusterResource.after();
+                       }
+               } finally {
+                       delegate.tempFolder.delete();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3e056b34/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
index f0db4d5..c4b06d4 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
@@ -23,7 +23,8 @@ package org.apache.flink.test.checkpointing;
  */
 public class AsyncFileBackendEventTimeWindowCheckpointingITCase extends 
AbstractEventTimeWindowCheckpointingITCase {
 
-       public AsyncFileBackendEventTimeWindowCheckpointingITCase() {
-               super(StateBackendEnum.FILE_ASYNC);
+       @Override
+       protected StateBackendEnum getStateBackend() {
+               return StateBackendEnum.FILE_ASYNC;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3e056b34/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
index 70ec757..2cc5b01 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
@@ -22,8 +22,8 @@ package org.apache.flink.test.checkpointing;
  * Integration tests for asynchronous memory backend.
  */
 public class AsyncMemBackendEventTimeWindowCheckpointingITCase extends 
AbstractEventTimeWindowCheckpointingITCase {
-
-       public AsyncMemBackendEventTimeWindowCheckpointingITCase() {
-               super(StateBackendEnum.MEM_ASYNC);
+       @Override
+       protected StateBackendEnum getStateBackend() {
+               return StateBackendEnum.MEM_ASYNC;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3e056b34/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
index 030c1a3..eab6153 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
@@ -22,8 +22,8 @@ package org.apache.flink.test.checkpointing;
  * Integration tests for file backend.
  */
 public class FileBackendEventTimeWindowCheckpointingITCase extends 
AbstractEventTimeWindowCheckpointingITCase {
-
-       public FileBackendEventTimeWindowCheckpointingITCase() {
-               super(StateBackendEnum.FILE);
+       @Override
+       protected StateBackendEnum getStateBackend() {
+               return StateBackendEnum.FILE;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3e056b34/flink-tests/src/test/java/org/apache/flink/test/checkpointing/HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
index 394815f..ed43ad6 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
@@ -23,8 +23,9 @@ package org.apache.flink.test.checkpointing;
  */
 public class HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase 
extends AbstractEventTimeWindowCheckpointingITCase {
 
-       public HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase() {
-               super(StateBackendEnum.ROCKSDB_INCREMENTAL_ZK);
+       @Override
+       protected StateBackendEnum getStateBackend() {
+               return StateBackendEnum.ROCKSDB_INCREMENTAL_ZK;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/3e056b34/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
index dfb66cc..1276a00 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
@@ -23,8 +23,9 @@ package org.apache.flink.test.checkpointing;
  */
 public class IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase 
extends AbstractEventTimeWindowCheckpointingITCase {
 
-       public IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase() {
-               super(StateBackendEnum.ROCKSDB_INCREMENTAL);
+       @Override
+       protected StateBackendEnum getStateBackend() {
+               return StateBackendEnum.ROCKSDB_INCREMENTAL;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/3e056b34/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryHeapITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryHeapITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryHeapITCase.java
new file mode 100644
index 0000000..2c0c294
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryHeapITCase.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import static 
org.apache.flink.runtime.state.LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED;
+import static 
org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.FILE_ASYNC;
+
+/**
+ * Tests file-based local recovery with the HeapBackend.
+ */
+public class LocalRecoveryHeapITCase extends AbstractLocalRecoveryITCase {
+       public LocalRecoveryHeapITCase() {
+               super(
+                       FILE_ASYNC,
+                       ENABLE_FILE_BASED);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3e056b34/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryITCase.java
deleted file mode 100644
index 51b3b84..0000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryITCase.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing;
-
-import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-import java.io.IOException;
-
-import static 
org.apache.flink.runtime.state.LocalRecoveryConfig.LocalRecoveryMode;
-import static 
org.apache.flink.runtime.state.LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED;
-import static 
org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum;
-import static 
org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.FILE_ASYNC;
-import static 
org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_FULLY_ASYNC;
-import static 
org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK;
-
-/**
- * This test delegates to instances of {@link 
AbstractEventTimeWindowCheckpointingITCase} that have been reconfigured
- * to use local recovery.
- */
-public class LocalRecoveryITCase extends TestLogger {
-
-       @Rule
-       public TestName testName = new TestName();
-
-       @Test
-       public void testLocalRecoveryHeapBackendFileBased() throws Exception {
-               executeTest(
-                       FILE_ASYNC,
-                       ENABLE_FILE_BASED);
-       }
-
-       @Test
-       public void testLocalRecoveryRocksIncrementalFileBased() throws 
Exception {
-               executeTest(
-                       ROCKSDB_INCREMENTAL_ZK,
-                       ENABLE_FILE_BASED);
-       }
-
-       @Test
-       public void testLocalRecoveryRocksFullFileBased() throws Exception {
-               executeTest(
-                       ROCKSDB_FULLY_ASYNC,
-                       ENABLE_FILE_BASED);
-       }
-
-       private void executeTest(
-               StateBackendEnum backendEnum,
-               LocalRecoveryMode recoveryMode) throws Exception {
-
-               AbstractEventTimeWindowCheckpointingITCase windowChkITCase =
-                       new 
AbstractEventTimeWindowCheckpointingITCaseWithLocalRecovery(
-                               backendEnum,
-                               recoveryMode);
-
-               executeTest(windowChkITCase);
-       }
-
-       private void executeTest(AbstractEventTimeWindowCheckpointingITCase 
delegate) throws Exception {
-               delegate.name = testName;
-               delegate.tempFolder.create();
-               try {
-                       delegate.startTestCluster();
-                       delegate.testTumblingTimeWindow();
-                       delegate.stopTestCluster();
-
-                       delegate.startTestCluster();
-                       delegate.testSlidingTimeWindow();
-                       delegate.stopTestCluster();
-               } finally {
-                       delegate.tempFolder.delete();
-               }
-       }
-
-       private static class 
AbstractEventTimeWindowCheckpointingITCaseWithLocalRecovery
-               extends AbstractEventTimeWindowCheckpointingITCase {
-
-               private final LocalRecoveryMode recoveryMode;
-
-               AbstractEventTimeWindowCheckpointingITCaseWithLocalRecovery(
-                       StateBackendEnum stateBackendEnum,
-                       LocalRecoveryMode recoveryMode) {
-
-                       super(stateBackendEnum);
-                       this.recoveryMode = recoveryMode;
-               }
-
-               @Override
-               protected Configuration createClusterConfig() throws 
IOException {
-                       Configuration config = super.createClusterConfig();
-
-                       config.setString(
-                               CheckpointingOptions.LOCAL_RECOVERY,
-                               recoveryMode.toString());
-
-                       return config;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3e056b34/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBFullITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBFullITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBFullITCase.java
new file mode 100644
index 0000000..16bbbfc
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBFullITCase.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import static 
org.apache.flink.runtime.state.LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED;
+import static 
org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_FULLY_ASYNC;
+
+/**
+ * Tests file-based local recovery with the RocksDB state-backend.
+ */
+public class LocalRecoveryRocksDBFullITCase extends 
AbstractLocalRecoveryITCase {
+       public LocalRecoveryRocksDBFullITCase() {
+               super(
+                       ROCKSDB_FULLY_ASYNC,
+                       ENABLE_FILE_BASED);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3e056b34/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBIncrementalITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBIncrementalITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBIncrementalITCase.java
new file mode 100644
index 0000000..fa8e139
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBIncrementalITCase.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import static 
org.apache.flink.runtime.state.LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED;
+import static 
org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK;
+
+/**
+ * Tests file-based local recovery with the RocksDB state-backend and 
incremental checkpointing enabled.
+ */
+public class LocalRecoveryRocksDBIncrementalITCase extends 
AbstractLocalRecoveryITCase {
+       public LocalRecoveryRocksDBIncrementalITCase() {
+               super(
+                       ROCKSDB_INCREMENTAL_ZK,
+                       ENABLE_FILE_BASED);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3e056b34/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java
index 54a29ed..e153b4b 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java
@@ -23,7 +23,8 @@ package org.apache.flink.test.checkpointing;
  */
 public class MemBackendEventTimeWindowCheckpointingITCase extends 
AbstractEventTimeWindowCheckpointingITCase {
 
-       public MemBackendEventTimeWindowCheckpointingITCase() {
-               super(StateBackendEnum.MEM);
+       @Override
+       protected StateBackendEnum getStateBackend() {
+               return StateBackendEnum.MEM;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3e056b34/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
index 3873aff..e6d5b9e 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
@@ -23,8 +23,9 @@ package org.apache.flink.test.checkpointing;
  */
 public class RocksDbBackendEventTimeWindowCheckpointingITCase extends 
AbstractEventTimeWindowCheckpointingITCase {
 
-       public RocksDbBackendEventTimeWindowCheckpointingITCase() {
-               super(StateBackendEnum.ROCKSDB_FULLY_ASYNC);
+       @Override
+       protected StateBackendEnum getStateBackend() {
+               return StateBackendEnum.ROCKSDB_FULLY_ASYNC;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/3e056b34/flink-tests/src/test/java/org/apache/flink/test/operators/CustomDistributionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/CustomDistributionITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/operators/CustomDistributionITCase.java
index 2452475..74b8cf7 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/operators/CustomDistributionITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/operators/CustomDistributionITCase.java
@@ -27,19 +27,15 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.test.operators.util.CollectionDataSets;
-import org.apache.flink.test.util.TestBaseUtils;
-import org.apache.flink.test.util.TestEnvironment;
+import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -52,32 +48,12 @@ import static org.junit.Assert.fail;
 @SuppressWarnings("serial")
 public class CustomDistributionITCase extends TestLogger {
 
-       // 
------------------------------------------------------------------------
-       //  The mini cluster that is shared across tests
-       // 
------------------------------------------------------------------------
-
-       private static LocalFlinkMiniCluster cluster;
-
-       @BeforeClass
-       public static void setup() throws Exception {
-               cluster = TestBaseUtils.startCluster(1, 8, false, false, true);
-       }
-
-       @AfterClass
-       public static void teardown() throws Exception {
-               TestBaseUtils.stopCluster(cluster, 
TestBaseUtils.DEFAULT_TIMEOUT);
-       }
-
-       @Before
-       public void prepare() {
-               TestEnvironment clusterEnv = new TestEnvironment(cluster, 1, 
false);
-               clusterEnv.setAsContext();
-       }
-
-       @After
-       public void cleanup() {
-               TestEnvironment.unsetAsContext();
-       }
+       @ClassRule
+       public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new 
MiniClusterResource(
+               new MiniClusterResource.MiniClusterResourceConfiguration(
+                       new Configuration(),
+                       1,
+                       8));
 
        // 
------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3e056b34/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
index f24d21e..4a7b345 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
@@ -27,14 +27,16 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.TestLogger;
 
 import akka.actor.ActorSystem;
+import org.junit.AssumptionViolatedException;
+import org.junit.Rule;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -56,31 +58,33 @@ import static org.junit.Assert.fail;
 @SuppressWarnings("serial")
 public class IPv6HostnamesITCase extends TestLogger {
 
-       @Test
-       public void testClusterWithIPv6host() {
+       @Rule
+       public final MiniClusterResource miniClusterResource = new 
MiniClusterResource(
+               new MiniClusterResource.MiniClusterResourceConfiguration(
+                       getConfiguration(),
+                       2,
+                       2));
 
+       private Configuration getConfiguration() {
                final Inet6Address ipv6address = getLocalIPv6Address();
                if (ipv6address == null) {
-                       System.err.println("--- Cannot find a non-loopback 
local IPv6 address that Akka/Netty can bind to; skipping IPv6HostnamesITCase");
-                       return;
+                       throw new AssumptionViolatedException("--- Cannot find 
a non-loopback local IPv6 address that Akka/Netty can bind to; skipping 
IPv6HostnamesITCase");
                }
+               final String addressString = ipv6address.getHostAddress();
+               log.info("Test will use IPv6 address " + addressString + " for 
connection tests");
+
+               Configuration config = new Configuration();
+               config.setString(JobManagerOptions.ADDRESS, addressString);
+               config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, 
addressString);
+               config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
+               return config;
+       }
 
-               LocalFlinkMiniCluster flink = null;
+       @Test
+       public void testClusterWithIPv6host() {
                try {
-                       final String addressString = 
ipv6address.getHostAddress();
-                       log.info("Test will use IPv6 address " + addressString 
+ " for connection tests");
 
-                       Configuration conf = new Configuration();
-                       conf.setString(JobManagerOptions.ADDRESS, 
addressString);
-                       
conf.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, addressString);
-                       
conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
-                       
conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
-                       conf.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
16L);
-
-                       flink = new LocalFlinkMiniCluster(conf, false);
-                       flink.start();
-
-                       ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment(addressString, 
flink.getLeaderRPCPort());
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                        env.setParallelism(4);
                        env.getConfig().disableSysoutLogging();
 
@@ -108,11 +112,6 @@ public class IPv6HostnamesITCase extends TestLogger {
                        e.printStackTrace();
                        fail(e.getMessage());
                }
-               finally {
-                       if (flink != null) {
-                               flink.stop();
-                       }
-               }
        }
 
        private Inet6Address getLocalIPv6Address() {

Reply via email to