[FLINK-7909] Replace StreamingMultipleProgramsTestBase by AbstractTestBase

The AbstractTestBase fully subsumes the functionality of the
StreamingMultipleProgramsTestBase since it now is the most general test base
for streaming and batch jobs. As a consequence, we can safely remove the
StreamingMultipleProgramsTestBase and let all corresponding tests extend from
AbstractTestBase.

This closes #4896.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b90210e3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b90210e3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b90210e3

Branch: refs/heads/master
Commit: b90210e3712a54ad85a33dfc308a03e0c4a2a250
Parents: 3c5c832
Author: Till Rohrmann <[email protected]>
Authored: Tue Oct 24 16:20:15 2017 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Tue Jan 9 08:05:51 2018 +0100

----------------------------------------------------------------------
 .../ElasticsearchSinkTestBase.java              |  8 ++-
 .../connectors/fs/RollingSinkITCase.java        | 22 +++++--
 .../connectors/fs/RollingSinkSecuredITCase.java | 62 +++++++++----------
 .../mapred/HadoopIOFormatsITCase.java           | 22 +++----
 .../mapred/HadoopMapredITCase.java              |  1 -
 .../mapreduce/HadoopInputOutputITCase.java      |  1 -
 .../apache/flink/storm/split/SplitITCase.java   |  4 +-
 .../examples/windowing/TopSpeedWindowing.java   |  1 -
 .../streaming/test/StreamingExamplesITCase.java |  4 +-
 .../TopSpeedWindowingExampleITCase.java         | 50 +++++++++------
 .../socket/SocketWindowWordCountITCase.java     |  4 +-
 .../examples/StreamingExamplesITCase.scala      | 22 +++----
 .../java/org/apache/flink/cep/CEPITCase.java    |  4 +-
 .../table/runtime/stream/sql/JavaSqlITCase.java |  4 +-
 .../flink/table/api/stream/ExplainTest.scala    |  4 +-
 .../UnsupportedOpsValidationTest.scala          |  4 +-
 .../runtime/stream/TimeAttributesITCase.scala   | 10 +--
 .../runtime/stream/sql/TableSourceITCase.scala  |  4 +-
 .../table/runtime/stream/table/CalcITCase.scala |  9 ++-
 .../runtime/stream/table/CorrelateITCase.scala  |  4 +-
 .../stream/table/GroupWindowITCase.scala        | 10 +--
 .../stream/table/SetOperatorsITCase.scala       |  4 +-
 .../runtime/stream/table/TableSinkITCase.scala  |  7 +--
 .../stream/table/TableSourceITCase.scala        |  6 +-
 .../utils/StreamingWithStateTestBase.scala      |  4 +-
 .../flink/streaming/api/DataStreamTest.java     |  3 +-
 .../scala/api/CsvOutputFormatITCase.java        |  4 +-
 .../scala/api/TextOutputFormatITCase.java       |  4 +-
 .../streaming/api/scala/CoGroupJoinITCase.scala |  7 +--
 .../streaming/api/scala/DataStreamTest.scala    | 28 ++++++---
 .../streaming/api/scala/SideOutputITCase.scala  |  4 +-
 .../api/scala/TimeWindowTranslationTest.scala   |  4 +-
 .../streaming/api/scala/WindowFoldITCase.scala  |  6 +-
 .../api/scala/WindowReduceITCase.scala          |  6 +-
 .../util/StreamingMultipleProgramsTestBase.java | 64 --------------------
 .../flink/test/util/AbstractTestBase.java       | 14 ++---
 .../flink/test/util/MiniClusterResource.java    | 11 ++--
 .../test/util/MultipleProgramsTestBase.java     | 48 ++++-----------
 .../apache/flink/test/util/TestBaseUtils.java   |  2 +-
 .../CoStreamCheckpointingITCase.java            |  4 +-
 .../StreamCheckpointNotifierITCase.java         |  4 +-
 .../test/state/ManualWindowSpeedITCase.java     |  4 +-
 .../streaming/api/StreamingOperatorsITCase.java |  4 +-
 .../runtime/ChainedRuntimeContextITCase.java    |  4 +-
 .../streaming/runtime/CoGroupJoinITCase.java    |  4 +-
 .../test/streaming/runtime/CoStreamITCase.java  |  4 +-
 .../streaming/runtime/DataStreamPojoITCase.java |  4 +-
 .../streaming/runtime/DirectedOutputITCase.java |  4 +-
 .../test/streaming/runtime/IterateITCase.java   | 26 ++++----
 .../streaming/runtime/OutputSplitterITCase.java |  4 +-
 .../streaming/runtime/PartitionerITCase.java    |  4 +-
 .../streaming/runtime/SelfConnectionITCase.java |  4 +-
 .../streaming/runtime/SideOutputITCase.java     |  4 +-
 .../streaming/runtime/StateBackendITCase.java   |  4 +-
 .../runtime/StreamTaskTimerITCase.java          |  4 +-
 .../streaming/runtime/WindowFoldITCase.java     |  4 +-
 .../sessionwindows/SessionWindowITCase.java     |  4 +-
 57 files changed, 253 insertions(+), 321 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
index 297bc5d..b90e8ed 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
@@ -22,7 +22,7 @@ import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.connectors.elasticsearch.testutils.SourceSinkDataTestKit;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.util.InstantiationUtil;
 
 import org.elasticsearch.client.Client;
@@ -31,6 +31,8 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
 import java.util.Collections;
@@ -44,7 +46,9 @@ import static org.junit.Assert.fail;
 /**
  * Environment preparation and suite of tests for version-specific {@link 
ElasticsearchSinkBase} implementations.
  */
-public abstract class ElasticsearchSinkTestBase extends 
StreamingMultipleProgramsTestBase {
+public abstract class ElasticsearchSinkTestBase extends AbstractTestBase {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ElasticsearchSinkTestBase.class);
 
        protected static final String CLUSTER_NAME = "test-cluster";
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
index 10d1846..78f643f 100644
--- 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
+++ 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
@@ -32,9 +32,10 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.TestLogger;
 
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Type;
@@ -83,13 +84,14 @@ import java.util.Map;
  * @deprecated should be removed with the {@link RollingSink}.
  */
 @Deprecated
-public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
+public class RollingSinkITCase extends TestLogger {
 
        protected static final Logger LOG = 
LoggerFactory.getLogger(RollingSinkITCase.class);
 
        @ClassRule
        public static TemporaryFolder tempFolder = new TemporaryFolder();
 
+       protected static MiniClusterResource miniClusterResource;
        protected static MiniDFSCluster hdfsCluster;
        protected static org.apache.hadoop.fs.FileSystem dfs;
        protected static String hdfsURI;
@@ -98,7 +100,7 @@ public class RollingSinkITCase extends 
StreamingMultipleProgramsTestBase {
        protected static File dataDir;
 
        @BeforeClass
-       public static void createHDFS() throws IOException {
+       public static void setup() throws Exception {
 
                LOG.info("In RollingSinkITCase: Starting MiniDFSCluster ");
 
@@ -113,12 +115,22 @@ public class RollingSinkITCase extends 
StreamingMultipleProgramsTestBase {
                hdfsURI = "hdfs://"
                                + 
NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), 
hdfsCluster.getNameNodePort())
                                + "/";
+
+               miniClusterResource = new MiniClusterResource(
+                       new 
MiniClusterResource.MiniClusterResourceConfiguration(
+                               new 
org.apache.flink.configuration.Configuration(),
+                               1,
+                               4));
        }
 
        @AfterClass
-       public static void destroyHDFS() {
+       public static void teardown() throws Exception {
                LOG.info("In RollingSinkITCase: tearing down MiniDFSCluster ");
                hdfsCluster.shutdown();
+
+               if (miniClusterResource != null) {
+                       miniClusterResource.after();
+               }
        }
 
        /**
@@ -926,6 +938,8 @@ public class RollingSinkITCase extends 
StreamingMultipleProgramsTestBase {
        }
 
        private static class StreamWriterWithConfigCheck<T> extends 
StringWriter<T> {
+               private static final long serialVersionUID = 
761584896826819477L;
+
                private String key;
                private String expect;
                public StreamWriterWithConfigCheck(String key, String expect) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
index 7595ac0..b76d087 100644
--- 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
+++ 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
@@ -25,7 +25,7 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.modules.HadoopModule;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.test.util.SecureTestEnvironment;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.test.util.TestingSecurityContext;
@@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.FileWriter;
-import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -94,19 +93,7 @@ public class RollingSinkSecuredITCase extends 
RollingSinkITCase {
         * and out-of-order sequence for secure cluster
         */
        @BeforeClass
-       public static void setup() throws Exception {}
-
-       @AfterClass
-       public static void teardown() throws Exception {}
-
-       @BeforeClass
-       public static void createHDFS() throws IOException {}
-
-       @AfterClass
-       public static void destroyHDFS() {}
-
-       @BeforeClass
-       public static void startSecureCluster() throws Exception {
+       public static void setup() throws Exception {
 
                skipIfHadoopVersionIsNotAppropriate();
 
@@ -158,20 +145,29 @@ public class RollingSinkSecuredITCase extends 
RollingSinkITCase {
                                + 
NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), 
hdfsCluster.getNameNodePort())
                                + "/";
 
-               startSecureFlinkClusterWithRecoveryModeEnabled();
+               Configuration configuration = 
startSecureFlinkClusterWithRecoveryModeEnabled();
+
+               miniClusterResource = new MiniClusterResource(new 
MiniClusterResource.MiniClusterResourceConfiguration(
+                       configuration,
+                       1,
+                       4));
+
+               miniClusterResource.before();
        }
 
        @AfterClass
-       public static void teardownSecureCluster() throws Exception {
+       public static void teardown() throws Exception {
                LOG.info("tearing down secure cluster environment");
 
-               TestStreamEnvironment.unsetAsContext();
-               stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
-
                if (hdfsCluster != null) {
                        hdfsCluster.shutdown();
                }
 
+               if (miniClusterResource != null) {
+                       miniClusterResource.after();
+                       miniClusterResource = null;
+               }
+
                SecureTestEnvironment.cleanup();
        }
 
@@ -208,30 +204,26 @@ public class RollingSinkSecuredITCase extends 
RollingSinkITCase {
                conf.set(DFS_DATANODE_HTTP_ADDRESS_KEY, "localhost:1003");
        }
 
-       private static void startSecureFlinkClusterWithRecoveryModeEnabled() {
+       private static Configuration 
startSecureFlinkClusterWithRecoveryModeEnabled() {
                try {
                        LOG.info("Starting Flink and ZK in secure mode");
 
                        dfs.mkdirs(new Path("/flink/checkpoints"));
                        dfs.mkdirs(new Path("/flink/recovery"));
 
-                       org.apache.flink.configuration.Configuration config = 
new org.apache.flink.configuration.Configuration();
-
-                       
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
-                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
DEFAULT_PARALLELISM);
-                       
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false);
-                       
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3);
-                       config.setString(HighAvailabilityOptions.HA_MODE, 
"zookeeper");
-                       config.setString(CoreOptions.STATE_BACKEND, 
"filesystem");
-                       
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINTS_PATH, hdfsURI 
+ "/flink/checkpoints");
-                       
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI + 
"/flink/recovery");
-                       config.setString("state.backend.fs.checkpointdir", 
hdfsURI + "/flink/checkpoints");
+                       final Configuration result = new Configuration();
 
-                       
SecureTestEnvironment.populateFlinkSecureConfigurations(config);
+                       
result.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false);
+                       
result.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3);
+                       result.setString(HighAvailabilityOptions.HA_MODE, 
"zookeeper");
+                       result.setString(CoreOptions.STATE_BACKEND, 
"filesystem");
+                       
result.setString(HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINTS_PATH, hdfsURI 
+ "/flink/checkpoints");
+                       
result.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI + 
"/flink/recovery");
+                       result.setString("state.backend.fs.checkpointdir", 
hdfsURI + "/flink/checkpoints");
 
-                       cluster = TestBaseUtils.startCluster(config, false);
-                       TestStreamEnvironment.setAsContext(cluster, 
DEFAULT_PARALLELISM);
+                       
SecureTestEnvironment.populateFlinkSecureConfigurations(result);
 
+                       return result;
                } catch (Exception e) {
                        throw new RuntimeException(e);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
index 46102a2..753d813 100644
--- 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
@@ -23,9 +23,7 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.util.JavaProgramTestBase;
-import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.OperatingSystem;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -44,11 +42,9 @@ import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
 import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
 import java.net.URI;
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.LinkedList;
 
 /**
  * Integration tests for Hadoop IO formats.
@@ -58,14 +54,14 @@ public class HadoopIOFormatsITCase extends 
JavaProgramTestBase {
 
        private static final int NUM_PROGRAMS = 2;
 
-       private int curProgId = config.getInteger("ProgramId", -1);
+       private final int curProgId;
        private String[] resultPath;
        private String[] expectedResult;
        private String sequenceFileInPath;
        private String sequenceFileInPathNull;
 
-       public HadoopIOFormatsITCase(Configuration config) {
-               super(config);
+       public HadoopIOFormatsITCase(int curProgId) {
+               this.curProgId = curProgId;
        }
 
        @Before
@@ -143,17 +139,15 @@ public class HadoopIOFormatsITCase extends 
JavaProgramTestBase {
        }
 
        @Parameters
-       public static Collection<Object[]> getConfigurations() throws 
FileNotFoundException, IOException {
+       public static Collection<Object[]> getConfigurations() {
 
-               LinkedList<Configuration> tConfigs = new 
LinkedList<Configuration>();
+               Collection<Object[]> programIds = new ArrayList<>(NUM_PROGRAMS);
 
                for (int i = 1; i <= NUM_PROGRAMS; i++) {
-                       Configuration config = new Configuration();
-                       config.setInteger("ProgramId", i);
-                       tConfigs.add(config);
+                       programIds.add(new Object[]{i});
                }
 
-               return TestBaseUtils.toParameterList(tConfigs);
+               return programIds;
        }
 
        private static class HadoopIOFormatPrograms {

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
index 145eaaa..db2ad8e 100644
--- 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
@@ -44,7 +44,6 @@ public class HadoopMapredITCase extends JavaProgramTestBase {
        protected void preSubmit() throws Exception {
                textPath = createTempFile("text.txt", WordCountData.TEXT);
                resultPath = getTempDirPath("result");
-               this.setParallelism(4);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
index a23a50d..783a5a6 100644
--- 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
@@ -46,7 +46,6 @@ public class HadoopInputOutputITCase extends 
JavaProgramTestBase {
        protected void preSubmit() throws Exception {
                textPath = createTempFile("text.txt", WordCountData.TEXT);
                resultPath = getTempDirPath("result");
-               this.setParallelism(4);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitITCase.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitITCase.java
index d53493c..7152cf2 100644
--- 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitITCase.java
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitITCase.java
@@ -19,7 +19,7 @@ package org.apache.flink.storm.split;
 
 import org.apache.flink.storm.split.SpoutSplitExample.Enrich;
 import org.apache.flink.storm.split.operators.VerifyAndEnrichBolt;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.test.util.AbstractTestBase;
 
 import org.junit.After;
 import org.junit.Assert;
@@ -32,7 +32,7 @@ import java.io.IOException;
 /**
  * Tests for split examples.
  */
-public class SplitITCase extends StreamingMultipleProgramsTestBase {
+public class SplitITCase extends AbstractTestBase {
 
        private String output;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
index 7543bab..ee06cd4 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
@@ -55,7 +55,6 @@ public class TopSpeedWindowing {
                final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
                env.getConfig().setGlobalJobParameters(params);
-               env.setParallelism(1);
 
                @SuppressWarnings({"rawtypes", "serial"})
                DataStream<Tuple4<Integer, Integer, Double, Long>> carData;

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
 
b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
index 4c47d59..cfe899e 100644
--- 
a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
+++ 
b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
@@ -29,8 +29,8 @@ import 
org.apache.flink.streaming.examples.ml.util.IncrementalLearningSkeletonDa
 import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData;
 import org.apache.flink.streaming.examples.windowing.util.SessionWindowingData;
 import org.apache.flink.streaming.test.examples.join.WindowJoinData;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.AbstractTestBase;
 
 import org.apache.commons.io.FileUtils;
 import org.junit.Test;
@@ -40,7 +40,7 @@ import java.io.File;
 /**
  * Integration test for streaming programs in Java examples.
  */
-public class StreamingExamplesITCase extends StreamingMultipleProgramsTestBase 
{
+public class StreamingExamplesITCase extends AbstractTestBase {
 
        @Test
        public void testIterateExample() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java
 
b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java
index c2f3164..320dd5f 100644
--- 
a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java
+++ 
b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java
@@ -17,33 +17,47 @@
 
 package org.apache.flink.streaming.test.examples.windowing;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.examples.windowing.TopSpeedWindowing;
 import 
org.apache.flink.streaming.examples.windowing.util.TopSpeedWindowingExampleData;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+
+import static 
org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory;
 
 /**
  * Tests for {@link TopSpeedWindowing}.
  */
-public class TopSpeedWindowingExampleITCase extends StreamingProgramTestBase {
+public class TopSpeedWindowingExampleITCase extends TestLogger {
 
-       protected String textPath;
-       protected String resultPath;
+       @ClassRule
+       public static TemporaryFolder temporaryFolder = new TemporaryFolder();
 
-       @Override
-       protected void preSubmit() throws Exception {
-               textPath = createTempFile("text.txt", 
TopSpeedWindowingExampleData.CAR_DATA);
-               resultPath = getTempDirPath("result");
-       }
+       @ClassRule
+       public static MiniClusterResource miniClusterResource = new 
MiniClusterResource(
+               new MiniClusterResource.MiniClusterResourceConfiguration(
+                       new Configuration(),
+                       1,
+                       1));
 
-       @Override
-       protected void postSubmit() throws Exception {
-               
compareResultsByLinesInMemory(TopSpeedWindowingExampleData.TOP_SPEEDS, 
resultPath);
-       }
+       @Test
+       public void testTopSpeedWindowingExampleITCase() throws Exception {
+               File inputFile = temporaryFolder.newFile();
+               FileUtils.writeFileUtf8(inputFile, 
TopSpeedWindowingExampleData.CAR_DATA);
+
+               final String resultPath = 
temporaryFolder.newFolder().toURI().toString();
 
-       @Override
-       protected void testProgram() throws Exception {
-               TopSpeedWindowing.main(new String[]{
-                               "--input", textPath,
-                               "--output", resultPath});
+               TopSpeedWindowing.main(new String[] {
+                       "--input", inputFile.getAbsolutePath(),
+                       "--output", resultPath});
+
+               
compareResultsByLinesInMemory(TopSpeedWindowingExampleData.TOP_SPEEDS, 
resultPath);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java
 
b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java
index a09b22e..91ee9bf 100644
--- 
a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java
+++ 
b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java
@@ -19,8 +19,8 @@ package org.apache.flink.streaming.test.socket;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.streaming.examples.socket.SocketWindowWordCount;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.AbstractTestBase;
 
 import org.junit.Test;
 
@@ -38,7 +38,7 @@ import static org.junit.Assert.fail;
 /**
  * Tests for {@link SocketWindowWordCount}.
  */
-public class SocketWindowWordCountITCase extends 
StreamingMultipleProgramsTestBase {
+public class SocketWindowWordCountITCase extends AbstractTestBase {
 
        @Test
        public void testJavaProgram() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
 
b/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
index 24d1444..7407294 100644
--- 
a/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
+++ 
b/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
@@ -24,7 +24,6 @@ import org.apache.commons.io.FileUtils
 import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala._
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.examples.iteration.util.IterateExampleData
 import 
org.apache.flink.streaming.examples.ml.util.IncrementalLearningSkeletonData
 import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData
@@ -37,7 +36,6 @@ import 
org.apache.flink.streaming.scala.examples.twitter.TwitterExample
 import org.apache.flink.streaming.scala.examples.windowing.{SessionWindowing, 
WindowWordCount}
 import org.apache.flink.streaming.scala.examples.wordcount.WordCount
 import org.apache.flink.streaming.test.examples.join.WindowJoinData
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.apache.flink.test.testdata.WordCountData
 import org.apache.flink.test.util.{AbstractTestBase, TestBaseUtils}
 import org.junit.Test
@@ -45,12 +43,12 @@ import org.junit.Test
 /**
  * Integration test for streaming programs in Scala examples.
  */
-class StreamingExamplesITCase extends StreamingMultipleProgramsTestBase {
+class StreamingExamplesITCase extends AbstractTestBase {
 
   @Test
   def testIterateExample(): Unit = {
-    val inputPath = AbstractTestBase.createTempFile("fibonacciInput.txt", 
IterateExampleData.INPUT_PAIRS)
-    val resultPath = AbstractTestBase.getTempDirPath("result")
+    val inputPath = createTempFile("fibonacciInput.txt", 
IterateExampleData.INPUT_PAIRS)
+    val resultPath = getTempDirPath("result")
 
     // the example is inherently non-deterministic. The iteration timeout of 
5000 ms
     // is frequently not enough to make the test run stable on CI 
infrastructure
@@ -99,14 +97,14 @@ class StreamingExamplesITCase extends 
StreamingMultipleProgramsTestBase {
 
   @Test
   def testIncrementalLearningSkeleton(): Unit = {
-    val resultPath = AbstractTestBase.getTempDirPath("result")
+    val resultPath = getTempDirPath("result")
     IncrementalLearningSkeleton.main(Array("--output", resultPath))
     
TestBaseUtils.compareResultsByLinesInMemory(IncrementalLearningSkeletonData.RESULTS,
 resultPath)
   }
 
   @Test
   def testTwitterExample(): Unit = {
-    val resultPath = AbstractTestBase.getTempDirPath("result")
+    val resultPath = getTempDirPath("result")
     TwitterExample.main(Array("--output", resultPath))
     TestBaseUtils.compareResultsByLinesInMemory(
       TwitterExampleData.STREAMING_COUNTS_AS_TUPLES,
@@ -115,7 +113,7 @@ class StreamingExamplesITCase extends 
StreamingMultipleProgramsTestBase {
 
   @Test
   def testSessionWindowing(): Unit = {
-    val resultPath = AbstractTestBase.getTempDirPath("result")
+    val resultPath = getTempDirPath("result")
     SessionWindowing.main(Array("--output", resultPath))
     TestBaseUtils.compareResultsByLinesInMemory(SessionWindowingData.EXPECTED, 
resultPath)
   }
@@ -124,8 +122,8 @@ class StreamingExamplesITCase extends 
StreamingMultipleProgramsTestBase {
   def testWindowWordCount(): Unit = {
     val windowSize = "250"
     val slideSize = "150"
-    val textPath = AbstractTestBase.createTempFile("text.txt", 
WordCountData.TEXT)
-    val resultPath = AbstractTestBase.getTempDirPath("result")
+    val textPath = createTempFile("text.txt", WordCountData.TEXT)
+    val resultPath = getTempDirPath("result")
 
     WindowWordCount.main(Array(
       "--input", textPath,
@@ -142,8 +140,8 @@ class StreamingExamplesITCase extends 
StreamingMultipleProgramsTestBase {
 
   @Test
   def testWordCount(): Unit = {
-    val textPath = AbstractTestBase.createTempFile("text.txt", 
WordCountData.TEXT)
-    val resultPath = AbstractTestBase.getTempDirPath("result")
+    val textPath = createTempFile("text.txt", WordCountData.TEXT)
+    val resultPath = getTempDirPath("result")
 
     WordCount.main(Array(
       "--input", textPath,

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
index 81b83a3..4f2383a 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
@@ -32,7 +32,7 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.types.Either;
 
 import org.junit.After;
@@ -48,7 +48,7 @@ import java.util.Map;
  * End to end tests of both CEP operators and {@link NFA}.
  */
 @SuppressWarnings("serial")
-public class CEPITCase extends StreamingMultipleProgramsTestBase {
+public class CEPITCase extends AbstractTestBase {
 
        private String resultPath;
        private String expected;

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
index f3d0309..44f89cc 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
@@ -25,12 +25,12 @@ import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.java.StreamTableEnvironment;
 import org.apache.flink.table.runtime.utils.JavaStreamTestData;
 import org.apache.flink.table.runtime.utils.StreamITCase;
+import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.types.Row;
 
 import org.junit.Test;
@@ -41,7 +41,7 @@ import java.util.List;
 /**
  * Integration tests for streaming SQL.
  */
-public class JavaSqlITCase extends StreamingMultipleProgramsTestBase {
+public class JavaSqlITCase extends AbstractTestBase {
 
        @Test
        public void testRowRegisterRowWithNames() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
index 820de08..741a3cb 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
@@ -20,13 +20,13 @@ package org.apache.flink.table.api.stream
 
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.scala._
+import org.apache.flink.test.util.AbstractTestBase
 import org.junit.Assert.assertEquals
 import org.junit._
 
-class ExplainTest extends StreamingMultipleProgramsTestBase {
+class ExplainTest extends AbstractTestBase {
 
   private val testFilePath = this.getClass.getResource("/").getFile
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/UnsupportedOpsValidationTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/UnsupportedOpsValidationTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/UnsupportedOpsValidationTest.scala
index c1ad08c..1de2b1e 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/UnsupportedOpsValidationTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/UnsupportedOpsValidationTest.scala
@@ -19,13 +19,13 @@
 package org.apache.flink.table.api.stream.table.validation
 
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.{TableEnvironment, ValidationException}
 import org.apache.flink.table.runtime.utils.StreamTestData
+import org.apache.flink.test.util.AbstractTestBase
 import org.junit.Test
 
-class UnsupportedOpsValidationTest extends StreamingMultipleProgramsTestBase {
+class UnsupportedOpsValidationTest extends AbstractTestBase {
 
   @Test(expected = classOf[ValidationException])
   def testSort(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
index a301354..c553ee6 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
@@ -18,8 +18,8 @@
 
 package org.apache.flink.table.runtime.stream
 
-import java.math.BigDecimal
 import java.lang.{Integer => JInt, Long => JLong}
+import java.math.BigDecimal
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.RowTypeInfo
@@ -28,15 +28,15 @@ import org.apache.flink.streaming.api.TimeCharacteristic
 import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.table.runtime.utils.JavaPojos.Pojo1
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.plan.TimeIndicatorConversionTest.TableFunc
 import org.apache.flink.table.api.{TableEnvironment, TableSchema, Types}
 import org.apache.flink.table.expressions.{ExpressionParser, TimeIntervalUnit}
+import org.apache.flink.table.plan.TimeIndicatorConversionTest.TableFunc
 import 
org.apache.flink.table.runtime.stream.TimeAttributesITCase.{AtomicTimestampWithEqualWatermark,
 TestPojo, TimestampWithEqualWatermark, TimestampWithEqualWatermarkPojo}
+import org.apache.flink.table.runtime.utils.JavaPojos.Pojo1
 import org.apache.flink.table.runtime.utils.StreamITCase
 import org.apache.flink.table.utils.{MemoryTableSinkUtil, 
TestTableSourceWithTime}
+import org.apache.flink.test.util.AbstractTestBase
 import org.apache.flink.types.Row
 import org.junit.Assert._
 import org.junit.Test
@@ -46,7 +46,7 @@ import scala.collection.mutable
 /**
   * Tests for access and materialization of time attributes.
   */
-class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
+class TimeAttributesITCase extends AbstractTestBase {
 
   val data = List(
     (1L, 1, 1d, 1f, new BigDecimal("1"), "Hi"),

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
index 30ada56..246ce2e 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
@@ -20,17 +20,17 @@ package org.apache.flink.table.runtime.stream.sql
 
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.runtime.utils.{CommonTestData, StreamITCase}
+import org.apache.flink.test.util.AbstractTestBase
 import org.apache.flink.types.Row
 import org.junit.Assert._
 import org.junit.Test
 
 import scala.collection.mutable
 
-class TableSourceITCase extends StreamingMultipleProgramsTestBase {
+class TableSourceITCase extends AbstractTestBase {
 
   @Test
   def testCsvTableSource(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
index 46788f5..a20b626 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
@@ -20,20 +20,19 @@ package org.apache.flink.table.runtime.stream.table
 
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
 import org.apache.flink.table.expressions.Literal
 import org.apache.flink.table.expressions.utils.{Func13, RichFunc1, RichFunc2, 
SplitUDF}
-import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData}
-import org.apache.flink.table.runtime.utils.UserDefinedFunctionTestUtils
+import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, 
UserDefinedFunctionTestUtils}
+import org.apache.flink.test.util.AbstractTestBase
 import org.apache.flink.types.Row
 import org.junit.Assert._
 import org.junit.Test
 
 import scala.collection.mutable
 
-class CalcITCase extends StreamingMultipleProgramsTestBase {
+class CalcITCase extends AbstractTestBase {
 
   @Test
   def testSimpleSelectAll(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala
index 215526d..0f563e6 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala
@@ -21,19 +21,19 @@ import java.lang.{Boolean => JBoolean}
 
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.{TableEnvironment, Types, 
ValidationException}
 import org.apache.flink.table.expressions.utils.{Func18, RichFunc2}
 import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, _}
 import org.apache.flink.table.utils._
+import org.apache.flink.test.util.AbstractTestBase
 import org.apache.flink.types.Row
 import org.junit.Assert._
 import org.junit.{Before, Test}
 
 import scala.collection.mutable
 
-class CorrelateITCase extends StreamingMultipleProgramsTestBase {
+class CorrelateITCase extends AbstractTestBase {
 
   val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment
   val tEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(env)

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala
index 1eebeee..588cff1 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala
@@ -22,17 +22,17 @@ import java.math.BigDecimal
 
 import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment}
 import org.apache.flink.streaming.api.TimeCharacteristic
 import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import 
org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{CountDistinct,
 CountDistinctWithMerge, WeightedAvg, WeightedAvgWithMerge}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment}
 import org.apache.flink.table.functions.aggfunctions.CountAggFunction
 import org.apache.flink.table.runtime.stream.table.GroupWindowITCase._
+import 
org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{CountDistinct,
 CountDistinctWithMerge, WeightedAvg, WeightedAvgWithMerge}
 import org.apache.flink.table.runtime.utils.StreamITCase
+import org.apache.flink.test.util.AbstractTestBase
 import org.apache.flink.types.Row
 import org.junit.Assert._
 import org.junit.Test
@@ -43,7 +43,7 @@ import scala.collection.mutable
   * We only test some aggregations until better testing of constructed 
DataStream
   * programs is possible.
   */
-class GroupWindowITCase extends StreamingMultipleProgramsTestBase {
+class GroupWindowITCase extends AbstractTestBase {
   private val queryConfig = new StreamQueryConfig()
   queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2))
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
index 5e15e14..479bce2 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
@@ -20,18 +20,18 @@ package org.apache.flink.table.runtime.stream.table
 
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.runtime.utils.CommonTestData.NonPojo
 import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData}
+import org.apache.flink.test.util.AbstractTestBase
 import org.apache.flink.types.Row
 import org.junit.Assert._
 import org.junit.Test
 
 import scala.collection.mutable
 
-class SetOperatorsITCase extends StreamingMultipleProgramsTestBase {
+class SetOperatorsITCase extends AbstractTestBase {
 
   @Test
   def testUnion(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
index b44d8ef..f1badee 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
@@ -31,22 +31,21 @@ import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.streaming.api.functions.sink.SinkFunction
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.{TableEnvironment, TableException, Types}
 import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData}
 import org.apache.flink.table.sinks._
 import org.apache.flink.table.utils.MemoryTableSinkUtil
-import org.apache.flink.test.util.TestBaseUtils
+import org.apache.flink.test.util.{AbstractTestBase, TestBaseUtils}
 import org.apache.flink.types.Row
 import org.apache.flink.util.Collector
 import org.junit.Assert._
 import org.junit.Test
 
-import scala.collection.mutable
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
-class TableSinkITCase extends StreamingMultipleProgramsTestBase {
+class TableSinkITCase extends AbstractTestBase {
 
   @Test
   def testInsertIntoRegisteredTableSink(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
index c9ea30a..d1a88b7 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
@@ -29,12 +29,12 @@ import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment 
=> JExecEnv}
 import org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.table.api.{TableEnvironment, TableException, 
TableSchema, Types}
 import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableEnvironment, TableException, 
TableSchema, Types}
 import org.apache.flink.table.runtime.utils.{CommonTestData, StreamITCase}
 import org.apache.flink.table.sources.StreamTableSource
 import org.apache.flink.table.utils._
+import org.apache.flink.test.util.AbstractTestBase
 import org.apache.flink.types.Row
 import org.apache.flink.util.Collector
 import org.junit.Assert._
@@ -43,7 +43,7 @@ import org.junit.Test
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
-class TableSourceITCase extends StreamingMultipleProgramsTestBase {
+class TableSourceITCase extends AbstractTestBase {
 
   @Test(expected = classOf[TableException])
   def testInvalidDatastreamType(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithStateTestBase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithStateTestBase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithStateTestBase.scala
index 8c41f22..5cfab4a 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithStateTestBase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithStateTestBase.scala
@@ -18,11 +18,11 @@
 package org.apache.flink.table.runtime.utils
 
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.test.util.AbstractTestBase
 import org.junit.Rule
 import org.junit.rules.TemporaryFolder
 
-class StreamingWithStateTestBase extends StreamingMultipleProgramsTestBase {
+class StreamingWithStateTestBase extends AbstractTestBase {
 
   val _tempFolder = new TemporaryFolder
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index b76ade7..6fb06d3 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -70,6 +70,7 @@ import 
org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
 
 import org.hamcrest.core.StringStartsWith;
 import org.junit.Assert;
@@ -90,7 +91,7 @@ import static org.junit.Assert.fail;
  * Tests for {@link DataStream}.
  */
 @SuppressWarnings("serial")
-public class DataStreamTest {
+public class DataStreamTest extends TestLogger {
 
        /**
         * Tests union functionality. This ensures that self-unions and unions 
of streams

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java
 
b/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java
index fb7c765..2311092 100644
--- 
a/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java
+++ 
b/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java
@@ -19,8 +19,8 @@ package org.apache.flink.streaming.scala.api;
 
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.streaming.api.scala.OutputFormatTestPrograms;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.AbstractTestBase;
 
 import org.junit.After;
 import org.junit.Before;
@@ -34,7 +34,7 @@ import static org.junit.Assert.fail;
 /**
  * IT cases for the {@link org.apache.flink.api.java.io.CsvOutputFormat}.
  */
-public class CsvOutputFormatITCase extends StreamingMultipleProgramsTestBase  {
+public class CsvOutputFormatITCase extends AbstractTestBase {
 
        protected String resultPath;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java
 
b/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java
index 84b81e2..c2e450a 100644
--- 
a/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java
+++ 
b/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java
@@ -19,8 +19,8 @@ package org.apache.flink.streaming.scala.api;
 
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.streaming.api.scala.OutputFormatTestPrograms;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.AbstractTestBase;
 
 import org.junit.After;
 import org.junit.Before;
@@ -34,7 +34,7 @@ import static org.junit.Assert.fail;
 /**
  * IT cases for the {@link org.apache.flink.api.java.io.TextOutputFormat}.
  */
-public class TextOutputFormatITCase extends StreamingMultipleProgramsTestBase {
+public class TextOutputFormatITCase extends AbstractTestBase {
 
        protected String resultPath;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
index fddbe00..5412e8e 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
@@ -27,14 +27,13 @@ import 
org.apache.flink.streaming.api.functions.source.SourceFunction
 import org.apache.flink.streaming.api.watermark.Watermark
 import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
 import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-
-import org.junit.Test
+import org.apache.flink.test.util.AbstractTestBase
 import org.junit.Assert._
+import org.junit.Test
 
 import scala.collection.mutable
 
-class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
+class CoGroupJoinITCase extends AbstractTestBase {
 
   @Test
   def testCoGroup(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index 60c609d..6158c8e 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -21,7 +21,6 @@ package org.apache.flink.streaming.api.scala
 import java.lang
 
 import org.apache.flink.api.common.functions._
-import org.apache.flink.api.common.operators.ResourceSpec
 import org.apache.flink.api.java.typeutils.TypeExtractor
 import org.apache.flink.streaming.api.collector.selector.OutputSelector
 import org.apache.flink.streaming.api.functions.ProcessFunction
@@ -32,12 +31,12 @@ import 
org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
 import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, 
PurgingTrigger}
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
 import org.apache.flink.streaming.runtime.partitioner._
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.test.util.AbstractTestBase
 import org.apache.flink.util.Collector
 import org.junit.Assert._
 import org.junit.Test
 
-class DataStreamTest extends StreamingMultipleProgramsTestBase {
+class DataStreamTest extends AbstractTestBase {
 
   @Test
   def testNaming(): Unit = {
@@ -242,7 +241,8 @@ class DataStreamTest extends 
StreamingMultipleProgramsTestBase {
    */
   @Test
   def testParallelism() {
-    val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.createLocalEnvironment(10)
+    val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment
+    val parallelism = env.getParallelism
 
     val src = env.fromElements(new Tuple2[Long, Long](0L, 0L))
     val map = src.map(x => (0L, 0L))
@@ -255,9 +255,12 @@ class DataStreamTest extends 
StreamingMultipleProgramsTestBase {
     val sink = map.addSink(x => {})
 
     assert(1 == env.getStreamGraph.getStreamNode(src.getId).getParallelism)
-    assert(10 == env.getStreamGraph.getStreamNode(map.getId).getParallelism)
+    assert(parallelism == 
env.getStreamGraph.getStreamNode(map.getId).getParallelism)
     assert(1 == 
env.getStreamGraph.getStreamNode(windowed.getId).getParallelism)
-    assert(10 == 
env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism)
+    assert(parallelism == env
+      .getStreamGraph
+      .getStreamNode(sink.getTransformation.getId)
+      .getParallelism)
 
     try {
       src.setParallelism(3)
@@ -268,18 +271,23 @@ class DataStreamTest extends 
StreamingMultipleProgramsTestBase {
       }
     }
 
-    env.setParallelism(7)
+    val newParallelism = parallelism - 1
+
+    env.setParallelism(newParallelism)
     // the parallelism does not change since some windowing code takes the 
parallelism from
     // input operations and that cannot change dynamically
     assert(1 == env.getStreamGraph.getStreamNode(src.getId).getParallelism)
-    assert(10 == env.getStreamGraph.getStreamNode(map.getId).getParallelism)
+    assert(parallelism == 
env.getStreamGraph.getStreamNode(map.getId).getParallelism)
     assert(1 == 
env.getStreamGraph.getStreamNode(windowed.getId).getParallelism)
-    assert(10 == 
env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism)
+    assert(parallelism == env
+      .getStreamGraph
+      .getStreamNode(sink.getTransformation.getId)
+      .getParallelism)
 
     val parallelSource = env.generateSequence(0, 0)
     parallelSource.print()
 
-    assert(7 == 
env.getStreamGraph.getStreamNode(parallelSource.getId).getParallelism)
+    assert(newParallelism == 
env.getStreamGraph.getStreamNode(parallelSource.getId).getParallelism)
 
     parallelSource.setParallelism(3)
     assert(3 == 
env.getStreamGraph.getStreamNode(parallelSource.getId).getParallelism)

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala
index 8e66171..e844928 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala
@@ -27,8 +27,8 @@ import org.apache.flink.streaming.api.watermark.Watermark
 import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
 import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.apache.flink.test.streaming.runtime.util.TestListResultSink
+import org.apache.flink.test.util.AbstractTestBase
 import org.apache.flink.util.Collector
 import org.junit.Assert._
 import org.junit.Test
@@ -36,7 +36,7 @@ import org.junit.Test
 /**
  * Integration test for streaming programs using side outputs.
  */
-class SideOutputITCase extends StreamingMultipleProgramsTestBase {
+class SideOutputITCase extends AbstractTestBase {
 
   /**
     * Test ProcessFunction side output.

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala
index 35a56d7..db0fb71 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala
@@ -30,7 +30,7 @@ import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow
 import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.test.util.AbstractTestBase
 import org.apache.flink.util.Collector
 import org.junit.Assert._
 import org.junit.Test
@@ -39,7 +39,7 @@ import org.junit.Test
   * These tests verify that the api calls on [[WindowedStream]] that use the 
"time" shortcut
   * instantiate the correct window operator.
   */
-class TimeWindowTranslationTest extends StreamingMultipleProgramsTestBase {
+class TimeWindowTranslationTest extends AbstractTestBase {
 
   /**
     * Verifies that calls to timeWindow() instantiate a regular

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
index dc38758..ef27685 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
@@ -31,9 +31,9 @@ import org.apache.flink.streaming.api.watermark.Watermark
 import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
 import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.junit.{Ignore, Test}
+import org.apache.flink.test.util.AbstractTestBase
 import org.junit.Assert._
+import org.junit.Test
 
 import scala.collection.mutable
 
@@ -41,7 +41,7 @@ import scala.collection.mutable
  * Tests for Folds over windows. These also test whether 
OutputTypeConfigurable functions
  * work for windows, because FoldWindowFunction is OutputTypeConfigurable.
  */
-class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
+class WindowFoldITCase extends AbstractTestBase {
 
   @Test
   def testFoldWindow(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
index ee1dbfd..b2137f5 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
@@ -31,9 +31,9 @@ import org.apache.flink.streaming.api.watermark.Watermark
 import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
 import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.test.util.AbstractTestBase
 import org.junit.Assert._
-import org.junit.{Ignore, Test}
+import org.junit.Test
 
 import scala.collection.mutable
 
@@ -41,7 +41,7 @@ import scala.collection.mutable
  * Tests for Folds over windows. These also test whether 
OutputTypeConfigurable functions
  * work for windows, because FoldWindowFunction is OutputTypeConfigurable.
  */
-class WindowReduceITCase extends StreamingMultipleProgramsTestBase {
+class WindowReduceITCase extends AbstractTestBase {
 
   @Test
   def testReduceWindow(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
deleted file mode 100644
index aa6e618..0000000
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.test.util.AbstractTestBase;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Base class for streaming unit tests that run multiple tests and want to 
reuse the same
- * Flink cluster. This saves a significant amount of time, since the startup 
and
- * shutdown of the Flink clusters (including actor systems, etc) usually 
dominates
- * the execution of the actual tests.
- *
- * <p>To write a unit test against this test base, simply extend it and add
- * one or more regular test methods and retrieve the 
StreamExecutionEnvironment from
- * the context:
- *
- * <pre>
- *   {@literal @}Test
- *   public void someTest() {
- *       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
- *       // test code
- *       env.execute();
- *   }
- *
- *   {@literal @}Test
- *   public void anotherTest() {
- *       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
- *       // test code
- *       env.execute();
- *   }
- *
- * </pre>
- */
-public class StreamingMultipleProgramsTestBase extends AbstractTestBase {
-
-       // 
------------------------------------------------------------------------
-       //  The mini cluster that is shared across tests
-       // 
------------------------------------------------------------------------
-
-       protected static final int DEFAULT_PARALLELISM = 4;
-
-       protected static final Logger LOG = 
LoggerFactory.getLogger(StreamingMultipleProgramsTestBase.class);
-
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index bbd250d..65b351d 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -23,8 +23,6 @@ import org.apache.flink.util.FileUtils;
 
 import org.junit.ClassRule;
 import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
@@ -42,7 +40,7 @@ import java.io.IOException;
  * <pre>
  *   {@literal @}Test
  *   public void someTest() {
- *       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+ *       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
  *       // test code
  *       env.execute();
  *   }
@@ -58,8 +56,6 @@ import java.io.IOException;
  */
 public abstract class AbstractTestBase extends TestBaseUtils {
 
-       protected static final Logger LOG = 
LoggerFactory.getLogger(AbstractTestBase.class);
-
        private static final int DEFAULT_PARALLELISM = 4;
 
        @ClassRule
@@ -77,17 +73,17 @@ public abstract class AbstractTestBase extends 
TestBaseUtils {
        //  Temporary File Utilities
        // 
--------------------------------------------------------------------------------------------
 
-       public static String getTempDirPath(String dirName) throws IOException {
+       public String getTempDirPath(String dirName) throws IOException {
                File f = createAndRegisterTempFile(dirName);
                return f.toURI().toString();
        }
 
-       public static String getTempFilePath(String fileName) throws 
IOException {
+       public String getTempFilePath(String fileName) throws IOException {
                File f = createAndRegisterTempFile(fileName);
                return f.toURI().toString();
        }
 
-       public static String createTempFile(String fileName, String contents) 
throws IOException {
+       public String createTempFile(String fileName, String contents) throws 
IOException {
                File f = createAndRegisterTempFile(fileName);
                if (!f.getParentFile().exists()) {
                        f.getParentFile().mkdirs();
@@ -97,7 +93,7 @@ public abstract class AbstractTestBase extends TestBaseUtils {
                return f.toURI().toString();
        }
 
-       public static File createAndRegisterTempFile(String fileName) throws 
IOException {
+       public File createAndRegisterTempFile(String fileName) throws 
IOException {
                return new File(TEMPORARY_FOLDER.newFolder(), fileName);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
index c8872ac..20dbebb 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
@@ -19,6 +19,7 @@
 package org.apache.flink.test.util;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -60,11 +61,13 @@ public class MiniClusterResource extends ExternalResource {
 
        @Override
        public void before() throws Exception {
+               final Configuration configuration = new 
Configuration(miniClusterResourceConfiguration.getConfiguration());
+
+               
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
miniClusterResourceConfiguration.getNumberTaskManagers());
+               
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
miniClusterResourceConfiguration.numberSlotsPerTaskManager);
+
                localFlinkMiniCluster = TestBaseUtils.startCluster(
-                       
miniClusterResourceConfiguration.getNumberTaskManagers(),
-                       
miniClusterResourceConfiguration.getNumberSlotsPerTaskManager(),
-                       false,
-                       false,
+                       configuration,
                        true);
 
                numberSlots = 
miniClusterResourceConfiguration.getNumberSlotsPerTaskManager() * 
miniClusterResourceConfiguration.getNumberTaskManagers();

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
index 06792ea..10039e6 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
@@ -18,12 +18,8 @@
 
 package org.apache.flink.test.util;
 
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-
 import org.junit.After;
-import org.junit.AfterClass;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.runners.Parameterized;
 
 import java.util.Arrays;
@@ -57,7 +53,7 @@ import java.util.Collection;
  *
  * }</pre>
  */
-public class MultipleProgramsTestBase extends TestBaseUtils {
+public class MultipleProgramsTestBase extends AbstractTestBase {
 
        /**
         * Enum that defines which execution environment to run the next test 
on:
@@ -70,16 +66,6 @@ public class MultipleProgramsTestBase extends TestBaseUtils {
        }
 
        // 
------------------------------------------------------------------------
-       //  The mini cluster that is shared across tests
-       // 
------------------------------------------------------------------------
-
-       protected static final int DEFAULT_PARALLELISM = 4;
-
-       protected static boolean startWebServer = false;
-
-       protected static LocalFlinkMiniCluster cluster = null;
-
-       // 
------------------------------------------------------------------------
 
        protected final TestExecutionMode mode;
 
@@ -93,12 +79,21 @@ public class MultipleProgramsTestBase extends TestBaseUtils 
{
 
        @Before
        public void setupEnvironment() {
+               TestEnvironment testEnvironment;
                switch(mode){
                        case CLUSTER:
-                               new TestEnvironment(cluster, 4, 
false).setAsContext();
+                               // This only works because of the quirks we 
built in the TestEnvironment.
+                               // We should refactor this in the future!!!
+                               testEnvironment = 
miniClusterResource.getTestEnvironment();
+                               
testEnvironment.getConfig().disableObjectReuse();
+                               testEnvironment.setAsContext();
                                break;
                        case CLUSTER_OBJECT_REUSE:
-                               new TestEnvironment(cluster, 4, 
true).setAsContext();
+                               // This only works because of the quirks we 
built in the TestEnvironment.
+                               // We should refactor this in the future!!!
+                               testEnvironment = 
miniClusterResource.getTestEnvironment();
+                               testEnvironment.getConfig().enableObjectReuse();
+                               testEnvironment.setAsContext();
                                break;
                        case COLLECTION:
                                new CollectionTestEnvironment().setAsContext();
@@ -120,25 +115,6 @@ public class MultipleProgramsTestBase extends 
TestBaseUtils {
        }
 
        // 
------------------------------------------------------------------------
-       //  Cluster setup & teardown
-       // 
------------------------------------------------------------------------
-
-       @BeforeClass
-       public static void setup() throws Exception {
-               cluster = TestBaseUtils.startCluster(
-                       1,
-                       DEFAULT_PARALLELISM,
-                       startWebServer,
-                       false,
-                       true);
-       }
-
-       @AfterClass
-       public static void teardown() throws Exception {
-               stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
-       }
-
-       // 
------------------------------------------------------------------------
        //  Parametrization lets the tests run in cluster and collection mode
        // 
------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index d2237ad..5a96326 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -459,7 +459,7 @@ public class TestBaseUtils extends TestLogger {
                                throw new IllegalArgumentException("This path 
does not denote a local file.");
                        }
                } catch (URISyntaxException | NullPointerException e) {
-                       throw new IllegalArgumentException("This path does not 
describe a valid local file URI.");
+                       throw new IllegalArgumentException("This path does not 
describe a valid local file URI.", e);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
index d224905..b207de8 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
@@ -30,7 +30,7 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.test.util.TestUtils;
 import org.apache.flink.util.Collector;
 
@@ -57,7 +57,7 @@ import static org.junit.Assert.assertTrue;
  * state reflects the "exactly once" semantics.
  */
 @SuppressWarnings({"serial", "deprecation"})
-public class CoStreamCheckpointingITCase extends 
StreamingMultipleProgramsTestBase {
+public class CoStreamCheckpointingITCase extends AbstractTestBase {
 
        private static final long NUM_STRINGS = 10_000L;
        private static final int PARALLELISM = 4;

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
index 16d8b54..7b058a0 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
@@ -34,7 +34,7 @@ import 
org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.util.Collector;
 
 import org.junit.Test;
@@ -69,7 +69,7 @@ import static org.junit.Assert.fail;
  * successfully completed checkpoint.
  */
 @SuppressWarnings("serial")
-public class StreamCheckpointNotifierITCase extends 
StreamingMultipleProgramsTestBase {
+public class StreamCheckpointNotifierITCase extends AbstractTestBase {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(StreamCheckpointNotifierITCase.class);
 

Reply via email to