Repository: flink
Updated Branches:
  refs/heads/master 0973e34b3 -> de10b4009


[FLINK-9136][tests] Remove StreamingProgramTestBase

This closes #5817.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/de10b400
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/de10b400
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/de10b400

Branch: refs/heads/master
Commit: de10b40095bc0109faec3874d452b33586f35e7a
Parents: 0973e34
Author: zentol <[email protected]>
Authored: Wed Apr 4 19:22:53 2018 +0200
Committer: zentol <[email protected]>
Committed: Wed May 2 20:46:46 2018 +0200

----------------------------------------------------------------------
 .../exclamation/ExclamationWithBoltITCase.java  | 30 ++++-------
 .../exclamation/ExclamationWithSpoutITCase.java | 27 ++++------
 .../StormExclamationLocalITCase.java            | 30 ++++-------
 .../flink/storm/join/SingleJoinITCase.java      | 27 ++++------
 .../storm/tests/StormFieldsGroupingITCase.java  | 54 +++++++++----------
 .../flink/storm/tests/StormMetaDataITCase.java  |  9 ++--
 .../flink/storm/tests/StormUnionITCase.java     | 24 ++++-----
 .../wordcount/BoltTokenizerWordCountITCase.java | 27 ++++------
 .../BoltTokenizerWordCountPojoITCase.java       | 27 ++++------
 .../BoltTokenizerWordCountWithNamesITCase.java  | 27 ++++------
 .../wordcount/SpoutSourceWordCountITCase.java   | 27 ++++------
 .../storm/wordcount/WordCountLocalITCase.java   | 27 ++++------
 .../wordcount/WordCountLocalNamedITCase.java    | 27 ++++------
 .../TopSpeedWindowingExampleITCase.java         | 25 ++++-----
 .../ContinuousFileProcessingITCase.java         | 14 ++---
 .../python/api/PythonStreamBinderTest.java      |  7 +--
 .../scala/api/StatefulFunctionITCase.java       | 10 ++--
 .../util/StreamingProgramTestBase.java          | 57 --------------------
 .../api/outputformat/CsvOutputFormatITCase.java | 20 +++----
 .../outputformat/TextOutputFormatITCase.java    | 20 +++----
 20 files changed, 178 insertions(+), 338 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/de10b400/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithBoltITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithBoltITCase.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithBoltITCase.java
index 358919f..f0725e9 100644
--- 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithBoltITCase.java
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithBoltITCase.java
@@ -19,33 +19,25 @@
 package org.apache.flink.storm.exclamation;
 
 import org.apache.flink.storm.exclamation.util.ExclamationData;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
 import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.junit.Test;
 
 /**
  * Test for the ExclamationWithBolt example.
  */
-public class ExclamationWithBoltITCase extends StreamingProgramTestBase {
-
-       protected String textPath;
-       protected String resultPath;
-       protected String exclamationNum;
+public class ExclamationWithBoltITCase extends AbstractTestBase {
 
-       @Override
-       protected void preSubmit() throws Exception {
-               this.textPath = this.createTempFile("text.txt", 
WordCountData.TEXT);
-               this.resultPath = this.getTempDirPath("result");
-               this.exclamationNum = "3";
-       }
+       @Test
+       public void testProgram() throws Exception {
+               String textPath = createTempFile("text.txt", 
WordCountData.TEXT);
+               String resultPath = getTempDirPath("result");
+               String exclamationNum = "3";
 
-       @Override
-       protected void postSubmit() throws Exception {
-               
compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, 
this.resultPath);
-       }
+               ExclamationWithBolt.main(new String[]{textPath, resultPath, 
exclamationNum});
 
-       @Override
-       protected void testProgram() throws Exception {
-               ExclamationWithBolt.main(new String[]{this.textPath, 
this.resultPath, this.exclamationNum});
+               
compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, 
resultPath);
        }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/de10b400/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithSpoutITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithSpoutITCase.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithSpoutITCase.java
index 64294d1..4d16c4a 100644
--- 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithSpoutITCase.java
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithSpoutITCase.java
@@ -19,30 +19,23 @@
 package org.apache.flink.storm.exclamation;
 
 import org.apache.flink.storm.exclamation.util.ExclamationData;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
 import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.junit.Test;
 
 /**
  * Test for the ExclamationWithSpout example.
  */
-public class ExclamationWithSpoutITCase extends StreamingProgramTestBase {
-
-       protected String textPath;
-       protected String resultPath;
+public class ExclamationWithSpoutITCase extends AbstractTestBase {
 
-       @Override
-       protected void preSubmit() throws Exception {
-               this.textPath = this.createTempFile("text.txt", 
WordCountData.TEXT);
-               this.resultPath = this.getTempDirPath("result");
-       }
+       @Test
+       public void testProgram() throws Exception {
+               String textPath = createTempFile("text.txt", 
WordCountData.TEXT);
+               String resultPath = getTempDirPath("result");
 
-       @Override
-       protected void postSubmit() throws Exception {
-               
compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, 
this.resultPath);
-       }
+               ExclamationWithSpout.main(new String[]{textPath, resultPath});
 
-       @Override
-       protected void testProgram() throws Exception {
-               ExclamationWithSpout.main(new String[]{this.textPath, 
this.resultPath});
+               
compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, 
resultPath);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/de10b400/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/StormExclamationLocalITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/StormExclamationLocalITCase.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/StormExclamationLocalITCase.java
index bc09a3d..c82da37 100644
--- 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/StormExclamationLocalITCase.java
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/StormExclamationLocalITCase.java
@@ -19,32 +19,24 @@
 package org.apache.flink.storm.exclamation;
 
 import org.apache.flink.storm.exclamation.util.ExclamationData;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
 import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.junit.Test;
 
 /**
  * Test for the ExclamationLocal example.
  */
-public class StormExclamationLocalITCase extends StreamingProgramTestBase {
-
-       protected String textPath;
-       protected String resultPath;
-       protected String exclamationNum;
+public class StormExclamationLocalITCase extends AbstractTestBase {
 
-       @Override
-       protected void preSubmit() throws Exception {
-               this.textPath = this.createTempFile("text.txt", 
WordCountData.TEXT);
-               this.resultPath = this.getTempDirPath("result");
-               this.exclamationNum = "3";
-       }
+       @Test
+       public void testProgram() throws Exception {
+               String textPath = createTempFile("text.txt", 
WordCountData.TEXT);
+               String resultPath = getTempDirPath("result");
+               String exclamationNum = "3";
 
-       @Override
-       protected void postSubmit() throws Exception {
-               
compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, 
this.resultPath);
-       }
+               ExclamationLocal.main(new String[]{textPath, resultPath, 
exclamationNum});
 
-       @Override
-       protected void testProgram() throws Exception {
-               ExclamationLocal.main(new String[]{this.textPath, 
this.resultPath, this.exclamationNum});
+               
compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, 
resultPath);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/de10b400/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/join/SingleJoinITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/join/SingleJoinITCase.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/join/SingleJoinITCase.java
index 5d406db..c00c154 100644
--- 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/join/SingleJoinITCase.java
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/join/SingleJoinITCase.java
@@ -18,14 +18,16 @@
 
 package org.apache.flink.storm.join;
 
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.util.AbstractTestBase;
 
 import org.apache.flink.shaded.guava18.com.google.common.base.Joiner;
 
+import org.junit.Test;
+
 /**
  * Test for the SingleJoin example.
  */
-public class SingleJoinITCase extends StreamingProgramTestBase {
+public class SingleJoinITCase extends AbstractTestBase {
 
        protected static String[] expectedOutput = {
                        "(male,20)",
@@ -40,23 +42,14 @@ public class SingleJoinITCase extends 
StreamingProgramTestBase {
                        "(female,29)"
        };
 
-       protected String resultPath;
-
-       @Override
-       protected void preSubmit() throws Exception {
-               this.resultPath = this.getTempDirPath("result");
-       }
-
-       @Override
-       protected void postSubmit() throws Exception {
-               
compareResultsByLinesInMemory(Joiner.on("\n").join(expectedOutput), 
this.resultPath);
-       }
-
-       @Override
-       protected void testProgram() throws Exception {
+       @Test
+       public void testProgram() throws Exception {
+               String resultPath = getTempDirPath("result");
                // We need to remove the file scheme because we can't use the 
Flink file system.
                // (to remain compatible with Storm)
-               SingleJoinExample.main(new String[]{ 
this.resultPath.replace("file:", "") });
+               SingleJoinExample.main(new String[]{resultPath.replace("file:", 
"")});
+
+               
compareResultsByLinesInMemory(Joiner.on("\n").join(expectedOutput), resultPath);
        }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/de10b400/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
index 6e02d81..6905953 100644
--- 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
@@ -24,13 +24,14 @@ import 
org.apache.flink.storm.tests.operators.FiniteRandomSpout;
 import org.apache.flink.storm.tests.operators.TaskIdBolt;
 import org.apache.flink.storm.util.BoltFileSink;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.util.MathUtils;
 
 import org.apache.storm.Config;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.tuple.Fields;
 import org.junit.Assert;
+import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -41,24 +42,36 @@ import java.util.List;
  * This test relies on the hash function used by the {@link DataStream#keyBy}, 
which is
  * assumed to be {@link MathUtils#murmurHash}.
  */
-public class StormFieldsGroupingITCase extends StreamingProgramTestBase {
+public class StormFieldsGroupingITCase extends AbstractTestBase {
 
        private static final String topologyId = "FieldsGrouping Test";
        private static final String spoutId = "spout";
        private static final String boltId = "bolt";
        private static final String sinkId = "sink";
-       private String resultPath;
 
-       @Override
-       protected void preSubmit() throws Exception {
-               this.resultPath = this.getTempDirPath("result");
-       }
+       @Test
+       public void testProgram() throws Exception {
+               String resultPath = this.getTempDirPath("result");
+
+               final String[] tokens = resultPath.split(":");
+               final String outputFile = tokens[tokens.length - 1];
+
+               final TopologyBuilder builder = new TopologyBuilder();
+
+               builder.setSpout(spoutId, new FiniteRandomSpout(0, 10, 2));
+               builder.setBolt(boltId, new TaskIdBolt(), 2).fieldsGrouping(
+                               spoutId, FiniteRandomSpout.STREAM_PREFIX + 0, 
new Fields("number"));
+               builder.setBolt(sinkId, new 
BoltFileSink(outputFile)).shuffleGrouping(boltId);
+
+               final FlinkLocalCluster cluster = 
FlinkLocalCluster.getLocalCluster();
+               Config conf = new Config();
+               conf.put(FlinkLocalCluster.SUBMIT_BLOCKING, true); // only 
required to stabilize integration test
+               cluster.submitTopology(topologyId, conf, 
FlinkTopology.createTopology(builder));
+               cluster.shutdown();
 
-       @Override
-       protected void postSubmit() throws Exception {
                List<String> expectedResults = Arrays.asList(
-                               "-1155484576", "1033096058", "-1930858313", 
"1431162155", "-1557280266", "-1728529858", "1654374947",
-                               "-65105105", "-518907128", "-252332814");
+                       "-1155484576", "1033096058", "-1930858313", 
"1431162155", "-1557280266", "-1728529858", "1654374947",
+                       "-65105105", "-518907128", "-252332814");
 
                List<String> actualResults = new ArrayList<>();
                readAllResultLines(actualResults, resultPath, new String[0], 
false);
@@ -82,23 +95,4 @@ public class StormFieldsGroupingITCase extends 
StreamingProgramTestBase {
                }
        }
 
-       @Override
-       protected void testProgram() throws Exception {
-               final String[] tokens = this.resultPath.split(":");
-               final String outputFile = tokens[tokens.length - 1];
-
-               final TopologyBuilder builder = new TopologyBuilder();
-
-               builder.setSpout(spoutId, new FiniteRandomSpout(0, 10, 2));
-               builder.setBolt(boltId, new TaskIdBolt(), 2).fieldsGrouping(
-                               spoutId, FiniteRandomSpout.STREAM_PREFIX + 0, 
new Fields("number"));
-               builder.setBolt(sinkId, new 
BoltFileSink(outputFile)).shuffleGrouping(boltId);
-
-               final FlinkLocalCluster cluster = 
FlinkLocalCluster.getLocalCluster();
-               Config conf = new Config();
-               conf.put(FlinkLocalCluster.SUBMIT_BLOCKING, true); // only 
required to stabilize integration test
-               cluster.submitTopology(topologyId, conf, 
FlinkTopology.createTopology(builder));
-               cluster.shutdown();
-       }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/de10b400/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormMetaDataITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormMetaDataITCase.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormMetaDataITCase.java
index fe09daf..c24a95e 100644
--- 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormMetaDataITCase.java
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormMetaDataITCase.java
@@ -22,24 +22,25 @@ import org.apache.flink.storm.api.FlinkLocalCluster;
 import org.apache.flink.storm.api.FlinkTopology;
 import org.apache.flink.storm.tests.operators.MetaDataSpout;
 import org.apache.flink.storm.tests.operators.VerifyMetaDataBolt;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.util.AbstractTestBase;
 
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.utils.Utils;
 import org.junit.Assert;
+import org.junit.Test;
 
 /**
  * Test for meta data spouts/bolts.
  */
-public class StormMetaDataITCase extends StreamingProgramTestBase {
+public class StormMetaDataITCase extends AbstractTestBase {
 
        private static final String topologyId = "FieldsGrouping Test";
        private static final String spoutId = "spout";
        private static final String boltId1 = "bolt1";
        private static final String boltId2 = "bolt2";
 
-       @Override
-       protected void testProgram() throws Exception {
+       @Test
+       public void testProgram() throws Exception {
                final TopologyBuilder builder = new TopologyBuilder();
 
                builder.setSpout(spoutId, new MetaDataSpout(), 2);

http://git-wip-us.apache.org/repos/asf/flink/blob/de10b400/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormUnionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormUnionITCase.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormUnionITCase.java
index 12e897a..6f6e47f 100644
--- 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormUnionITCase.java
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormUnionITCase.java
@@ -23,15 +23,16 @@ import org.apache.flink.storm.api.FlinkTopology;
 import org.apache.flink.storm.tests.operators.FiniteRandomSpout;
 import org.apache.flink.storm.tests.operators.MergerBolt;
 import org.apache.flink.storm.util.BoltFileSink;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.util.AbstractTestBase;
 
 import org.apache.storm.Config;
 import org.apache.storm.topology.TopologyBuilder;
+import org.junit.Test;
 
 /**
  * Test for the {@link MergerBolt}.
  */
-public class StormUnionITCase extends StreamingProgramTestBase {
+public class StormUnionITCase extends AbstractTestBase {
 
        private static final String RESULT = "-1154715079\n" + "-1155869325\n" 
+ "-1155484576\n"
                        + "431529176\n" + "1260042744\n" + "1761283695\n" + 
"1749940626\n" + "892128508\n"
@@ -47,20 +48,11 @@ public class StormUnionITCase extends 
StreamingProgramTestBase {
        private static final String spoutId3 = "spout3";
        private static final String boltId = "merger";
        private static final String sinkId = "sink";
-       private String resultPath;
 
-       @Override
-       protected void preSubmit() throws Exception {
-               this.resultPath = this.getTempDirPath("result");
-       }
-
-       @Override
-       protected void postSubmit() throws Exception {
-               compareResultsByLinesInMemory(RESULT, this.resultPath);
-       }
+       @Test
+       public void testProgram() throws Exception {
+               String resultPath = this.getTempDirPath("result");
 
-       @Override
-       protected void testProgram() throws Exception {
                final TopologyBuilder builder = new TopologyBuilder();
 
                // get input data
@@ -73,7 +65,7 @@ public class StormUnionITCase extends 
StreamingProgramTestBase {
                                .shuffleGrouping(spoutId2, 
FiniteRandomSpout.STREAM_PREFIX + 0)
                                .shuffleGrouping(spoutId3, 
FiniteRandomSpout.STREAM_PREFIX + 0);
 
-               final String[] tokens = this.resultPath.split(":");
+               final String[] tokens = resultPath.split(":");
                final String outputFile = tokens[tokens.length - 1];
                builder.setBolt(sinkId, new 
BoltFileSink(outputFile)).shuffleGrouping(boltId);
 
@@ -83,6 +75,8 @@ public class StormUnionITCase extends 
StreamingProgramTestBase {
                conf.put(FlinkLocalCluster.SUBMIT_BLOCKING, true); // only 
required to stabilize integration test
                cluster.submitTopology(topologyId, conf, 
FlinkTopology.createTopology(builder));
                cluster.shutdown();
+
+               compareResultsByLinesInMemory(RESULT, resultPath);
        }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/de10b400/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountITCase.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountITCase.java
index d1cc5a2..57e5d42 100644
--- 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountITCase.java
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountITCase.java
@@ -18,31 +18,24 @@
 
 package org.apache.flink.storm.wordcount;
 
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
 import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.junit.Test;
 
 /**
  * Test for the BoltTokenizerWordCount example.
  */
-public class BoltTokenizerWordCountITCase extends StreamingProgramTestBase {
-
-       protected String textPath;
-       protected String resultPath;
+public class BoltTokenizerWordCountITCase extends AbstractTestBase {
 
-       @Override
-       protected void preSubmit() throws Exception {
-               this.textPath = this.createTempFile("text.txt", 
WordCountData.TEXT);
-               this.resultPath = this.getTempDirPath("result");
-       }
+       @Test
+       public void testProgram() throws Exception {
+               String textPath = createTempFile("text.txt", 
WordCountData.TEXT);
+               String resultPath = getTempDirPath("result");
 
-       @Override
-       protected void postSubmit() throws Exception {
-               
compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, 
this.resultPath);
-       }
+               BoltTokenizerWordCount.main(new String[]{textPath, resultPath});
 
-       @Override
-       protected void testProgram() throws Exception {
-               BoltTokenizerWordCount.main(new String[]{this.textPath, 
this.resultPath});
+               
compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, 
resultPath);
        }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/de10b400/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojoITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojoITCase.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojoITCase.java
index 0eb4a6e..656700e 100644
--- 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojoITCase.java
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojoITCase.java
@@ -18,31 +18,24 @@
 
 package org.apache.flink.storm.wordcount;
 
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
 import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.junit.Test;
 
 /**
  * Test for the BoltTokenizerWordCountPojo example.
  */
-public class BoltTokenizerWordCountPojoITCase extends StreamingProgramTestBase 
{
-
-       protected String textPath;
-       protected String resultPath;
+public class BoltTokenizerWordCountPojoITCase extends AbstractTestBase {
 
-       @Override
-       protected void preSubmit() throws Exception {
-               this.textPath = this.createTempFile("text.txt", 
WordCountData.TEXT);
-               this.resultPath = this.getTempDirPath("result");
-       }
+       @Test
+       public void testProgram() throws Exception {
+               String textPath = createTempFile("text.txt", 
WordCountData.TEXT);
+               String resultPath = getTempDirPath("result");
 
-       @Override
-       protected void postSubmit() throws Exception {
-               
compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, 
this.resultPath);
-       }
+               BoltTokenizerWordCountPojo.main(new String[]{textPath, 
resultPath});
 
-       @Override
-       protected void testProgram() throws Exception {
-               BoltTokenizerWordCountPojo.main(new String[]{this.textPath, 
this.resultPath});
+               
compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, 
resultPath);
        }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/de10b400/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNamesITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNamesITCase.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNamesITCase.java
index 8879d9e..18e1f01 100644
--- 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNamesITCase.java
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNamesITCase.java
@@ -18,31 +18,24 @@
 
 package org.apache.flink.storm.wordcount;
 
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
 import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.junit.Test;
 
 /**
  * Test for the BoltTokenizerWordCountWithNames example.
  */
-public class BoltTokenizerWordCountWithNamesITCase extends 
StreamingProgramTestBase {
-
-       protected String textPath;
-       protected String resultPath;
+public class BoltTokenizerWordCountWithNamesITCase extends AbstractTestBase {
 
-       @Override
-       protected void preSubmit() throws Exception {
-               this.textPath = this.createTempFile("text.txt", 
WordCountData.TEXT);
-               this.resultPath = this.getTempDirPath("result");
-       }
+       @Test
+       public void testProgram() throws Exception {
+               String textPath = createTempFile("text.txt", 
WordCountData.TEXT);
+               String resultPath = getTempDirPath("result");
 
-       @Override
-       protected void postSubmit() throws Exception {
-               
compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, 
this.resultPath);
-       }
+               BoltTokenizerWordCountWithNames.main(new String[]{textPath, 
resultPath});
 
-       @Override
-       protected void testProgram() throws Exception {
-               BoltTokenizerWordCountWithNames.main(new 
String[]{this.textPath, this.resultPath});
+               
compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, 
resultPath);
        }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/de10b400/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/SpoutSourceWordCountITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/SpoutSourceWordCountITCase.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/SpoutSourceWordCountITCase.java
index ec2ca2c..594f56e 100644
--- 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/SpoutSourceWordCountITCase.java
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/SpoutSourceWordCountITCase.java
@@ -18,31 +18,24 @@
 
 package org.apache.flink.storm.wordcount;
 
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
 import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.junit.Test;
 
 /**
  * Test for the SpoutSourceWordCount example.
  */
-public class SpoutSourceWordCountITCase extends StreamingProgramTestBase {
-
-       protected String textPath;
-       protected String resultPath;
+public class SpoutSourceWordCountITCase extends AbstractTestBase {
 
-       @Override
-       protected void preSubmit() throws Exception {
-               this.textPath = this.createTempFile("text.txt", 
WordCountData.TEXT);
-               this.resultPath = this.getTempDirPath("result");
-       }
+       @Test
+       public void testProgram() throws Exception {
+               String textPath = createTempFile("text.txt", 
WordCountData.TEXT);
+               String resultPath = getTempDirPath("result");
 
-       @Override
-       protected void postSubmit() throws Exception {
-               
compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, 
this.resultPath);
-       }
+               SpoutSourceWordCount.main(new String[]{textPath, resultPath});
 
-       @Override
-       protected void testProgram() throws Exception {
-               SpoutSourceWordCount.main(new String[]{this.textPath, 
this.resultPath});
+               
compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, 
resultPath);
        }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/de10b400/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalITCase.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalITCase.java
index 471afa9..16844e5 100644
--- 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalITCase.java
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalITCase.java
@@ -18,31 +18,24 @@
 
 package org.apache.flink.storm.wordcount;
 
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
 import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.junit.Test;
 
 /**
  * Test for the WordCountLocal example.
  */
-public class WordCountLocalITCase extends StreamingProgramTestBase {
-
-       protected String textPath;
-       protected String resultPath;
+public class WordCountLocalITCase extends AbstractTestBase {
 
-       @Override
-       protected void preSubmit() throws Exception {
-               this.textPath = this.createTempFile("text.txt", 
WordCountData.TEXT);
-               this.resultPath = this.getTempDirPath("result");
-       }
+       @Test
+       public void testProgram() throws Exception {
+               String textPath = createTempFile("text.txt", 
WordCountData.TEXT);
+               String resultPath = getTempDirPath("result");
 
-       @Override
-       protected void postSubmit() throws Exception {
-               
compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, 
this.resultPath);
-       }
+               WordCountLocal.main(new String[]{textPath, resultPath});
 
-       @Override
-       protected void testProgram() throws Exception {
-               WordCountLocal.main(new String[] { this.textPath, 
this.resultPath });
+               
compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, 
resultPath);
        }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/de10b400/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalNamedITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalNamedITCase.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalNamedITCase.java
index 445ea37..0353c2c 100644
--- 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalNamedITCase.java
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalNamedITCase.java
@@ -18,31 +18,24 @@
 
 package org.apache.flink.storm.wordcount;
 
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
 import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.junit.Test;
 
 /**
  * Test for the WordCountLocalByName example.
  */
-public class WordCountLocalNamedITCase extends StreamingProgramTestBase {
-
-       protected String textPath;
-       protected String resultPath;
+public class WordCountLocalNamedITCase extends AbstractTestBase {
 
-       @Override
-       protected void preSubmit() throws Exception {
-               this.textPath = this.createTempFile("text.txt", 
WordCountData.TEXT);
-               this.resultPath = this.getTempDirPath("result");
-       }
+       @Test
+       public void testProgram() throws Exception {
+               String textPath = createTempFile("text.txt", 
WordCountData.TEXT);
+               String resultPath = getTempDirPath("result");
 
-       @Override
-       protected void postSubmit() throws Exception {
-               
compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, 
this.resultPath);
-       }
+               WordCountLocalByName.main(new String[]{textPath, resultPath});
 
-       @Override
-       protected void testProgram() throws Exception {
-               WordCountLocalByName.main(new String[] { this.textPath, 
this.resultPath });
+               
compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, 
resultPath);
        }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/de10b400/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/scala/examples/windowing/TopSpeedWindowingExampleITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/scala/examples/windowing/TopSpeedWindowingExampleITCase.java
 
b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/scala/examples/windowing/TopSpeedWindowingExampleITCase.java
index db27c60..6a98096 100644
--- 
a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/scala/examples/windowing/TopSpeedWindowingExampleITCase.java
+++ 
b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/scala/examples/windowing/TopSpeedWindowingExampleITCase.java
@@ -19,31 +19,24 @@ package 
org.apache.flink.streaming.test.scala.examples.windowing;
 
 import 
org.apache.flink.streaming.examples.windowing.util.TopSpeedWindowingExampleData;
 import org.apache.flink.streaming.scala.examples.windowing.TopSpeedWindowing;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.junit.Test;
 
 /**
  * Tests for {@link TopSpeedWindowing}.
  */
-public class TopSpeedWindowingExampleITCase extends StreamingProgramTestBase {
-       protected String textPath;
-       protected String resultPath;
-
-       @Override
-       protected void preSubmit() throws Exception {
-               textPath = createTempFile("text.txt", 
TopSpeedWindowingExampleData.CAR_DATA);
-               resultPath = getTempDirPath("result");
-       }
+public class TopSpeedWindowingExampleITCase extends AbstractTestBase {
 
-       @Override
-       protected void postSubmit() throws Exception {
-               
compareResultsByLinesInMemory(TopSpeedWindowingExampleData.TOP_CASE_CLASS_SPEEDS,
 resultPath);
-       }
+       @Test
+       public void testProgram() throws Exception {
+               String textPath = createTempFile("text.txt", 
TopSpeedWindowingExampleData.CAR_DATA);
+               String resultPath = getTempDirPath("result");
 
-       @Override
-       protected void testProgram() throws Exception {
                TopSpeedWindowing.main(new String[]{
                                "--input", textPath,
                                "--output", resultPath});
 
+               
compareResultsByLinesInMemory(TopSpeedWindowingExampleData.TOP_CASE_CLASS_SPEEDS,
 resultPath);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/de10b400/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java
 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java
index 42fddf5..65e46b5 100644
--- 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java
+++ 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java
@@ -33,7 +33,7 @@ import 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringF
 import 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
 import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
 import 
org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.util.AbstractTestBase;
 
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileUtil;
@@ -41,6 +41,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
@@ -58,7 +59,7 @@ import static org.junit.Assert.assertEquals;
 /**
  * IT cases for the {@link ContinuousFileMonitoringFunction} and {@link 
ContinuousFileReaderOperator}.
  */
-public class ContinuousFileProcessingITCase extends StreamingProgramTestBase {
+public class ContinuousFileProcessingITCase extends AbstractTestBase {
 
        private static final int NO_OF_FILES = 5;
        private static final int LINES_PER_FILE = 100;
@@ -110,8 +111,8 @@ public class ContinuousFileProcessingITCase extends 
StreamingProgramTestBase {
 
        //                                              END OF PREPARATIONS
 
-       @Override
-       protected void testProgram() throws Exception {
+       @Test
+       public void testProgram() throws Exception {
 
                /*
                * This test checks the interplay between the monitor and the 
reader
@@ -159,11 +160,6 @@ public class ContinuousFileProcessingITCase extends 
StreamingProgramTestBase {
                                        Throwable th = e;
                                        for (int depth = 0; depth < 20; 
depth++) {
                                                if (th instanceof 
SuccessException) {
-                                                       try {
-                                                               postSubmit();
-                                                       } catch (Exception e1) {
-                                                               
e1.printStackTrace();
-                                                       }
                                                        return;
                                                } else if (th.getCause() != 
null) {
                                                        th = th.getCause();

http://git-wip-us.apache.org/repos/asf/flink/blob/de10b400/flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java
 
b/flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java
index 9b3ddd3..e03c9ef 100644
--- 
a/flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java
+++ 
b/flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java
@@ -24,9 +24,10 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.local.LocalFileSystem;
 import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.util.Preconditions;
 
+import org.junit.Test;
 import org.python.core.PyException;
 
 import java.util.ArrayList;
@@ -35,7 +36,7 @@ import java.util.List;
 /**
  * Tests for the {@link PythonStreamBinder}.
  */
-public class PythonStreamBinderTest extends StreamingProgramTestBase {
+public class PythonStreamBinderTest extends AbstractTestBase {
 
        private static Path getBaseTestPythonDir() {
                FileSystem fs = new LocalFileSystem();
@@ -60,7 +61,7 @@ public class PythonStreamBinderTest extends 
StreamingProgramTestBase {
                return files;
        }
 
-       @Override
+       @Test
        public void testProgram() throws Exception {
                Path testEntryPoint = new Path(getBaseTestPythonDir(), 
"examples/word_count.py");
                List<String> testFiles = findTestFiles();

http://git-wip-us.apache.org/repos/asf/flink/blob/de10b400/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/StatefulFunctionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/StatefulFunctionITCase.java
 
b/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/StatefulFunctionITCase.java
index 2cb87b6..40e8ad9 100644
--- 
a/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/StatefulFunctionITCase.java
+++ 
b/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/StatefulFunctionITCase.java
@@ -18,15 +18,17 @@
 package org.apache.flink.streaming.scala.api;
 
 import org.apache.flink.streaming.api.scala.StateTestPrograms;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.junit.Test;
 
 /**
  * IT case using stateful functions.
  */
-public class StatefulFunctionITCase extends StreamingProgramTestBase {
+public class StatefulFunctionITCase extends AbstractTestBase {
 
-       @Override
-       protected void testProgram() throws Exception {
+       @Test
+       public void testProgram() throws Exception {
                StateTestPrograms.testStatefulFunctions();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/de10b400/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
deleted file mode 100644
index a18be08..0000000
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.test.util.AbstractTestBase;
-
-import org.junit.Test;
-
-/**
- * Base class for unit tests that run a single test.
- *
- * <p>To write a unit test against this test base, simply extend it and 
implement the {@link #testProgram()} method.
- */
-public abstract class StreamingProgramTestBase extends AbstractTestBase {
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Methods to create the test program and for pre- and post- test work
-       // 
--------------------------------------------------------------------------------------------
-
-       protected abstract void testProgram() throws Exception;
-
-       protected void preSubmit() throws Exception {}
-
-       protected void postSubmit() throws Exception {}
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Test entry point
-       // 
--------------------------------------------------------------------------------------------
-
-       @Test
-       public void testJob() throws Exception {
-               // pre-submit
-               preSubmit();
-
-               // call the test program
-               testProgram();
-
-               // post-submit
-               postSubmit();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de10b400/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/CsvOutputFormatITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/CsvOutputFormatITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/CsvOutputFormatITCase.java
index a95865a..cd85d26 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/CsvOutputFormatITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/CsvOutputFormatITCase.java
@@ -20,24 +20,21 @@ package org.apache.flink.test.streaming.api.outputformat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
 import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.test.testfunctions.Tokenizer;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.junit.Test;
 
 /**
  * Integration tests for {@link org.apache.flink.api.java.io.CsvOutputFormat}.
  */
-public class CsvOutputFormatITCase extends StreamingProgramTestBase {
-
-       protected String resultPath;
+public class CsvOutputFormatITCase extends AbstractTestBase {
 
-       @Override
-       protected void preSubmit() throws Exception {
-               resultPath = getTempDirPath("result");
-       }
+       @Test
+       public void testProgram() throws Exception {
+               String resultPath = getTempDirPath("result");
 
-       @Override
-       protected void testProgram() throws Exception {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
                DataStream<String> text = env.fromElements(WordCountData.TEXT);
@@ -49,10 +46,7 @@ public class CsvOutputFormatITCase extends 
StreamingProgramTestBase {
                counts.writeAsCsv(resultPath);
 
                env.execute("WriteAsCsvTest");
-       }
 
-       @Override
-       protected void postSubmit() throws Exception {
                //Strip the parentheses from the expected text like output
                
compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES
                                .replaceAll("[\\\\(\\\\)]", ""), resultPath);

http://git-wip-us.apache.org/repos/asf/flink/blob/de10b400/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/TextOutputFormatITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/TextOutputFormatITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/TextOutputFormatITCase.java
index 7f0ebc9..d5a5b5c 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/TextOutputFormatITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/TextOutputFormatITCase.java
@@ -20,24 +20,21 @@ package org.apache.flink.test.streaming.api.outputformat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
 import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.test.testfunctions.Tokenizer;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.junit.Test;
 
 /**
  * Integration tests for {@link org.apache.flink.api.java.io.TextOutputFormat}.
  */
-public class TextOutputFormatITCase extends StreamingProgramTestBase {
-
-       protected String resultPath;
+public class TextOutputFormatITCase extends AbstractTestBase {
 
-       @Override
-       protected void preSubmit() throws Exception {
-               resultPath = getTempDirPath("result");
-       }
+       @Test
+       public void testProgram() throws Exception {
+               String resultPath = getTempDirPath("result");
 
-       @Override
-       protected void testProgram() throws Exception {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
                DataStream<String> text = env.fromElements(WordCountData.TEXT);
@@ -49,10 +46,7 @@ public class TextOutputFormatITCase extends 
StreamingProgramTestBase {
                counts.writeAsText(resultPath);
 
                env.execute("WriteAsTextTest");
-       }
 
-       @Override
-       protected void postSubmit() throws Exception {
                
compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, 
resultPath);
        }
 

Reply via email to