Repository: flink Updated Branches: refs/heads/master 0973e34b3 -> de10b4009
[FLINK-9136][tests] Remove StreamingProgramTestBase This closes #5817. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/de10b400 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/de10b400 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/de10b400 Branch: refs/heads/master Commit: de10b40095bc0109faec3874d452b33586f35e7a Parents: 0973e34 Author: zentol <[email protected]> Authored: Wed Apr 4 19:22:53 2018 +0200 Committer: zentol <[email protected]> Committed: Wed May 2 20:46:46 2018 +0200 ---------------------------------------------------------------------- .../exclamation/ExclamationWithBoltITCase.java | 30 ++++------- .../exclamation/ExclamationWithSpoutITCase.java | 27 ++++------ .../StormExclamationLocalITCase.java | 30 ++++------- .../flink/storm/join/SingleJoinITCase.java | 27 ++++------ .../storm/tests/StormFieldsGroupingITCase.java | 54 +++++++++---------- .../flink/storm/tests/StormMetaDataITCase.java | 9 ++-- .../flink/storm/tests/StormUnionITCase.java | 24 ++++----- .../wordcount/BoltTokenizerWordCountITCase.java | 27 ++++------ .../BoltTokenizerWordCountPojoITCase.java | 27 ++++------ .../BoltTokenizerWordCountWithNamesITCase.java | 27 ++++------ .../wordcount/SpoutSourceWordCountITCase.java | 27 ++++------ .../storm/wordcount/WordCountLocalITCase.java | 27 ++++------ .../wordcount/WordCountLocalNamedITCase.java | 27 ++++------ .../TopSpeedWindowingExampleITCase.java | 25 ++++----- .../ContinuousFileProcessingITCase.java | 14 ++--- .../python/api/PythonStreamBinderTest.java | 7 +-- .../scala/api/StatefulFunctionITCase.java | 10 ++-- .../util/StreamingProgramTestBase.java | 57 -------------------- .../api/outputformat/CsvOutputFormatITCase.java | 20 +++---- .../outputformat/TextOutputFormatITCase.java | 20 +++---- 20 files changed, 178 insertions(+), 338 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/de10b400/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithBoltITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithBoltITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithBoltITCase.java index 358919f..f0725e9 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithBoltITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithBoltITCase.java @@ -19,33 +19,25 @@ package org.apache.flink.storm.exclamation; import org.apache.flink.storm.exclamation.util.ExclamationData; -import org.apache.flink.streaming.util.StreamingProgramTestBase; import org.apache.flink.test.testdata.WordCountData; +import org.apache.flink.test.util.AbstractTestBase; + +import org.junit.Test; /** * Test for the ExclamationWithBolt example. */ -public class ExclamationWithBoltITCase extends StreamingProgramTestBase { - - protected String textPath; - protected String resultPath; - protected String exclamationNum; +public class ExclamationWithBoltITCase extends AbstractTestBase { - @Override - protected void preSubmit() throws Exception { - this.textPath = this.createTempFile("text.txt", WordCountData.TEXT); - this.resultPath = this.getTempDirPath("result"); - this.exclamationNum = "3"; - } + @Test + public void testProgram() throws Exception { + String textPath = createTempFile("text.txt", WordCountData.TEXT); + String resultPath = getTempDirPath("result"); + String exclamationNum = "3"; - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, this.resultPath); - } + ExclamationWithBolt.main(new String[]{textPath, resultPath, exclamationNum}); - @Override - protected void testProgram() throws Exception { - ExclamationWithBolt.main(new String[]{this.textPath, this.resultPath, this.exclamationNum}); + compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, resultPath); } } http://git-wip-us.apache.org/repos/asf/flink/blob/de10b400/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithSpoutITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithSpoutITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithSpoutITCase.java index 64294d1..4d16c4a 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithSpoutITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithSpoutITCase.java @@ -19,30 +19,23 @@ package org.apache.flink.storm.exclamation; import org.apache.flink.storm.exclamation.util.ExclamationData; -import org.apache.flink.streaming.util.StreamingProgramTestBase; import org.apache.flink.test.testdata.WordCountData; +import org.apache.flink.test.util.AbstractTestBase; + +import org.junit.Test; /** * Test for the ExclamationWithSpout example. */ -public class ExclamationWithSpoutITCase extends StreamingProgramTestBase { - - protected String textPath; - protected String resultPath; +public class ExclamationWithSpoutITCase extends AbstractTestBase { - @Override - protected void preSubmit() throws Exception { - this.textPath = this.createTempFile("text.txt", WordCountData.TEXT); - this.resultPath = this.getTempDirPath("result"); - } + @Test + public void testProgram() throws Exception { + String textPath = createTempFile("text.txt", WordCountData.TEXT); + String resultPath = getTempDirPath("result"); - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, this.resultPath); - } + ExclamationWithSpout.main(new String[]{textPath, resultPath}); - @Override - protected void testProgram() throws Exception { - ExclamationWithSpout.main(new String[]{this.textPath, this.resultPath}); + compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, resultPath); } } http://git-wip-us.apache.org/repos/asf/flink/blob/de10b400/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/StormExclamationLocalITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/StormExclamationLocalITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/StormExclamationLocalITCase.java index bc09a3d..c82da37 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/StormExclamationLocalITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/StormExclamationLocalITCase.java @@ -19,32 +19,24 @@ package org.apache.flink.storm.exclamation; import org.apache.flink.storm.exclamation.util.ExclamationData; -import org.apache.flink.streaming.util.StreamingProgramTestBase; import org.apache.flink.test.testdata.WordCountData; +import org.apache.flink.test.util.AbstractTestBase; + +import org.junit.Test; /** * Test for the ExclamationLocal example. */ -public class StormExclamationLocalITCase extends StreamingProgramTestBase { - - protected String textPath; - protected String resultPath; - protected String exclamationNum; +public class StormExclamationLocalITCase extends AbstractTestBase { - @Override - protected void preSubmit() throws Exception { - this.textPath = this.createTempFile("text.txt", WordCountData.TEXT); - this.resultPath = this.getTempDirPath("result"); - this.exclamationNum = "3"; - } + @Test + public void testProgram() throws Exception { + String textPath = createTempFile("text.txt", WordCountData.TEXT); + String resultPath = getTempDirPath("result"); + String exclamationNum = "3"; - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, this.resultPath); - } + ExclamationLocal.main(new String[]{textPath, resultPath, exclamationNum}); - @Override - protected void testProgram() throws Exception { - ExclamationLocal.main(new String[]{this.textPath, this.resultPath, this.exclamationNum}); + compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, resultPath); } } http://git-wip-us.apache.org/repos/asf/flink/blob/de10b400/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/join/SingleJoinITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/join/SingleJoinITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/join/SingleJoinITCase.java index 5d406db..c00c154 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/join/SingleJoinITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/join/SingleJoinITCase.java @@ -18,14 +18,16 @@ package org.apache.flink.storm.join; -import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.shaded.guava18.com.google.common.base.Joiner; +import org.junit.Test; + /** * Test for the SingleJoin example. */ -public class SingleJoinITCase extends StreamingProgramTestBase { +public class SingleJoinITCase extends AbstractTestBase { protected static String[] expectedOutput = { "(male,20)", @@ -40,23 +42,14 @@ public class SingleJoinITCase extends StreamingProgramTestBase { "(female,29)" }; - protected String resultPath; - - @Override - protected void preSubmit() throws Exception { - this.resultPath = this.getTempDirPath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(Joiner.on("\n").join(expectedOutput), this.resultPath); - } - - @Override - protected void testProgram() throws Exception { + @Test + public void testProgram() throws Exception { + String resultPath = getTempDirPath("result"); // We need to remove the file scheme because we can't use the Flink file system. // (to remain compatible with Storm) - SingleJoinExample.main(new String[]{ this.resultPath.replace("file:", "") }); + SingleJoinExample.main(new String[]{resultPath.replace("file:", "")}); + + compareResultsByLinesInMemory(Joiner.on("\n").join(expectedOutput), resultPath); } } http://git-wip-us.apache.org/repos/asf/flink/blob/de10b400/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java index 6e02d81..6905953 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java @@ -24,13 +24,14 @@ import org.apache.flink.storm.tests.operators.FiniteRandomSpout; import org.apache.flink.storm.tests.operators.TaskIdBolt; import org.apache.flink.storm.util.BoltFileSink; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.util.MathUtils; import org.apache.storm.Config; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; import org.junit.Assert; +import org.junit.Test; import java.util.ArrayList; import java.util.Arrays; @@ -41,24 +42,36 @@ import java.util.List; * This test relies on the hash function used by the {@link DataStream#keyBy}, which is * assumed to be {@link MathUtils#murmurHash}. */ -public class StormFieldsGroupingITCase extends StreamingProgramTestBase { +public class StormFieldsGroupingITCase extends AbstractTestBase { private static final String topologyId = "FieldsGrouping Test"; private static final String spoutId = "spout"; private static final String boltId = "bolt"; private static final String sinkId = "sink"; - private String resultPath; - @Override - protected void preSubmit() throws Exception { - this.resultPath = this.getTempDirPath("result"); - } + @Test + public void testProgram() throws Exception { + String resultPath = this.getTempDirPath("result"); + + final String[] tokens = resultPath.split(":"); + final String outputFile = tokens[tokens.length - 1]; + + final TopologyBuilder builder = new TopologyBuilder(); + + builder.setSpout(spoutId, new FiniteRandomSpout(0, 10, 2)); + builder.setBolt(boltId, new TaskIdBolt(), 2).fieldsGrouping( + spoutId, FiniteRandomSpout.STREAM_PREFIX + 0, new Fields("number")); + builder.setBolt(sinkId, new BoltFileSink(outputFile)).shuffleGrouping(boltId); + + final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster(); + Config conf = new Config(); + conf.put(FlinkLocalCluster.SUBMIT_BLOCKING, true); // only required to stabilize integration test + cluster.submitTopology(topologyId, conf, FlinkTopology.createTopology(builder)); + cluster.shutdown(); - @Override - protected void postSubmit() throws Exception { List<String> expectedResults = Arrays.asList( - "-1155484576", "1033096058", "-1930858313", "1431162155", "-1557280266", "-1728529858", "1654374947", - "-65105105", "-518907128", "-252332814"); + "-1155484576", "1033096058", "-1930858313", "1431162155", "-1557280266", "-1728529858", "1654374947", + "-65105105", "-518907128", "-252332814"); List<String> actualResults = new ArrayList<>(); readAllResultLines(actualResults, resultPath, new String[0], false); @@ -82,23 +95,4 @@ public class StormFieldsGroupingITCase extends StreamingProgramTestBase { } } - @Override - protected void testProgram() throws Exception { - final String[] tokens = this.resultPath.split(":"); - final String outputFile = tokens[tokens.length - 1]; - - final TopologyBuilder builder = new TopologyBuilder(); - - builder.setSpout(spoutId, new FiniteRandomSpout(0, 10, 2)); - builder.setBolt(boltId, new TaskIdBolt(), 2).fieldsGrouping( - spoutId, FiniteRandomSpout.STREAM_PREFIX + 0, new Fields("number")); - builder.setBolt(sinkId, new BoltFileSink(outputFile)).shuffleGrouping(boltId); - - final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster(); - Config conf = new Config(); - conf.put(FlinkLocalCluster.SUBMIT_BLOCKING, true); // only required to stabilize integration test - cluster.submitTopology(topologyId, conf, FlinkTopology.createTopology(builder)); - cluster.shutdown(); - } - } http://git-wip-us.apache.org/repos/asf/flink/blob/de10b400/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormMetaDataITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormMetaDataITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormMetaDataITCase.java index fe09daf..c24a95e 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormMetaDataITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormMetaDataITCase.java @@ -22,24 +22,25 @@ import org.apache.flink.storm.api.FlinkLocalCluster; import org.apache.flink.storm.api.FlinkTopology; import org.apache.flink.storm.tests.operators.MetaDataSpout; import org.apache.flink.storm.tests.operators.VerifyMetaDataBolt; -import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.apache.flink.test.util.AbstractTestBase; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.utils.Utils; import org.junit.Assert; +import org.junit.Test; /** * Test for meta data spouts/bolts. */ -public class StormMetaDataITCase extends StreamingProgramTestBase { +public class StormMetaDataITCase extends AbstractTestBase { private static final String topologyId = "FieldsGrouping Test"; private static final String spoutId = "spout"; private static final String boltId1 = "bolt1"; private static final String boltId2 = "bolt2"; - @Override - protected void testProgram() throws Exception { + @Test + public void testProgram() throws Exception { final TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(spoutId, new MetaDataSpout(), 2); http://git-wip-us.apache.org/repos/asf/flink/blob/de10b400/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormUnionITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormUnionITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormUnionITCase.java index 12e897a..6f6e47f 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormUnionITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormUnionITCase.java @@ -23,15 +23,16 @@ import org.apache.flink.storm.api.FlinkTopology; import org.apache.flink.storm.tests.operators.FiniteRandomSpout; import org.apache.flink.storm.tests.operators.MergerBolt; import org.apache.flink.storm.util.BoltFileSink; -import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.apache.flink.test.util.AbstractTestBase; import org.apache.storm.Config; import org.apache.storm.topology.TopologyBuilder; +import org.junit.Test; /** * Test for the {@link MergerBolt}. */ -public class StormUnionITCase extends StreamingProgramTestBase { +public class StormUnionITCase extends AbstractTestBase { private static final String RESULT = "-1154715079\n" + "-1155869325\n" + "-1155484576\n" + "431529176\n" + "1260042744\n" + "1761283695\n" + "1749940626\n" + "892128508\n" @@ -47,20 +48,11 @@ public class StormUnionITCase extends StreamingProgramTestBase { private static final String spoutId3 = "spout3"; private static final String boltId = "merger"; private static final String sinkId = "sink"; - private String resultPath; - @Override - protected void preSubmit() throws Exception { - this.resultPath = this.getTempDirPath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(RESULT, this.resultPath); - } + @Test + public void testProgram() throws Exception { + String resultPath = this.getTempDirPath("result"); - @Override - protected void testProgram() throws Exception { final TopologyBuilder builder = new TopologyBuilder(); // get input data @@ -73,7 +65,7 @@ public class StormUnionITCase extends StreamingProgramTestBase { .shuffleGrouping(spoutId2, FiniteRandomSpout.STREAM_PREFIX + 0) .shuffleGrouping(spoutId3, FiniteRandomSpout.STREAM_PREFIX + 0); - final String[] tokens = this.resultPath.split(":"); + final String[] tokens = resultPath.split(":"); final String outputFile = tokens[tokens.length - 1]; builder.setBolt(sinkId, new BoltFileSink(outputFile)).shuffleGrouping(boltId); @@ -83,6 +75,8 @@ public class StormUnionITCase extends StreamingProgramTestBase { conf.put(FlinkLocalCluster.SUBMIT_BLOCKING, true); // only required to stabilize integration test cluster.submitTopology(topologyId, conf, FlinkTopology.createTopology(builder)); cluster.shutdown(); + + compareResultsByLinesInMemory(RESULT, resultPath); } } http://git-wip-us.apache.org/repos/asf/flink/blob/de10b400/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountITCase.java index d1cc5a2..57e5d42 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountITCase.java @@ -18,31 +18,24 @@ package org.apache.flink.storm.wordcount; -import org.apache.flink.streaming.util.StreamingProgramTestBase; import org.apache.flink.test.testdata.WordCountData; +import org.apache.flink.test.util.AbstractTestBase; + +import org.junit.Test; /** * Test for the BoltTokenizerWordCount example. */ -public class BoltTokenizerWordCountITCase extends StreamingProgramTestBase { - - protected String textPath; - protected String resultPath; +public class BoltTokenizerWordCountITCase extends AbstractTestBase { - @Override - protected void preSubmit() throws Exception { - this.textPath = this.createTempFile("text.txt", WordCountData.TEXT); - this.resultPath = this.getTempDirPath("result"); - } + @Test + public void testProgram() throws Exception { + String textPath = createTempFile("text.txt", WordCountData.TEXT); + String resultPath = getTempDirPath("result"); - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath); - } + BoltTokenizerWordCount.main(new String[]{textPath, resultPath}); - @Override - protected void testProgram() throws Exception { - BoltTokenizerWordCount.main(new String[]{this.textPath, this.resultPath}); + compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath); } } http://git-wip-us.apache.org/repos/asf/flink/blob/de10b400/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojoITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojoITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojoITCase.java index 0eb4a6e..656700e 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojoITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojoITCase.java @@ -18,31 +18,24 @@ package org.apache.flink.storm.wordcount; -import org.apache.flink.streaming.util.StreamingProgramTestBase; import org.apache.flink.test.testdata.WordCountData; +import org.apache.flink.test.util.AbstractTestBase; + +import org.junit.Test; /** * Test for the BoltTokenizerWordCountPojo example. */ -public class BoltTokenizerWordCountPojoITCase extends StreamingProgramTestBase { - - protected String textPath; - protected String resultPath; +public class BoltTokenizerWordCountPojoITCase extends AbstractTestBase { - @Override - protected void preSubmit() throws Exception { - this.textPath = this.createTempFile("text.txt", WordCountData.TEXT); - this.resultPath = this.getTempDirPath("result"); - } + @Test + public void testProgram() throws Exception { + String textPath = createTempFile("text.txt", WordCountData.TEXT); + String resultPath = getTempDirPath("result"); - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath); - } + BoltTokenizerWordCountPojo.main(new String[]{textPath, resultPath}); - @Override - protected void testProgram() throws Exception { - BoltTokenizerWordCountPojo.main(new String[]{this.textPath, this.resultPath}); + compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath); } } http://git-wip-us.apache.org/repos/asf/flink/blob/de10b400/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNamesITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNamesITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNamesITCase.java index 8879d9e..18e1f01 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNamesITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNamesITCase.java @@ -18,31 +18,24 @@ package org.apache.flink.storm.wordcount; -import org.apache.flink.streaming.util.StreamingProgramTestBase; import org.apache.flink.test.testdata.WordCountData; +import org.apache.flink.test.util.AbstractTestBase; + +import org.junit.Test; /** * Test for the BoltTokenizerWordCountWithNames example. */ -public class BoltTokenizerWordCountWithNamesITCase extends StreamingProgramTestBase { - - protected String textPath; - protected String resultPath; +public class BoltTokenizerWordCountWithNamesITCase extends AbstractTestBase { - @Override - protected void preSubmit() throws Exception { - this.textPath = this.createTempFile("text.txt", WordCountData.TEXT); - this.resultPath = this.getTempDirPath("result"); - } + @Test + public void testProgram() throws Exception { + String textPath = createTempFile("text.txt", WordCountData.TEXT); + String resultPath = getTempDirPath("result"); - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath); - } + BoltTokenizerWordCountWithNames.main(new String[]{textPath, resultPath}); - @Override - protected void testProgram() throws Exception { - BoltTokenizerWordCountWithNames.main(new String[]{this.textPath, this.resultPath}); + compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath); } } http://git-wip-us.apache.org/repos/asf/flink/blob/de10b400/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/SpoutSourceWordCountITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/SpoutSourceWordCountITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/SpoutSourceWordCountITCase.java index ec2ca2c..594f56e 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/SpoutSourceWordCountITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/SpoutSourceWordCountITCase.java @@ -18,31 +18,24 @@ package org.apache.flink.storm.wordcount; -import org.apache.flink.streaming.util.StreamingProgramTestBase; import org.apache.flink.test.testdata.WordCountData; +import org.apache.flink.test.util.AbstractTestBase; + +import org.junit.Test; /** * Test for the SpoutSourceWordCount example. */ -public class SpoutSourceWordCountITCase extends StreamingProgramTestBase { - - protected String textPath; - protected String resultPath; +public class SpoutSourceWordCountITCase extends AbstractTestBase { - @Override - protected void preSubmit() throws Exception { - this.textPath = this.createTempFile("text.txt", WordCountData.TEXT); - this.resultPath = this.getTempDirPath("result"); - } + @Test + public void testProgram() throws Exception { + String textPath = createTempFile("text.txt", WordCountData.TEXT); + String resultPath = getTempDirPath("result"); - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath); - } + SpoutSourceWordCount.main(new String[]{textPath, resultPath}); - @Override - protected void testProgram() throws Exception { - SpoutSourceWordCount.main(new String[]{this.textPath, this.resultPath}); + compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath); } } http://git-wip-us.apache.org/repos/asf/flink/blob/de10b400/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalITCase.java index 471afa9..16844e5 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalITCase.java @@ -18,31 +18,24 @@ package org.apache.flink.storm.wordcount; -import org.apache.flink.streaming.util.StreamingProgramTestBase; import org.apache.flink.test.testdata.WordCountData; +import org.apache.flink.test.util.AbstractTestBase; + +import org.junit.Test; /** * Test for the WordCountLocal example. */ -public class WordCountLocalITCase extends StreamingProgramTestBase { - - protected String textPath; - protected String resultPath; +public class WordCountLocalITCase extends AbstractTestBase { - @Override - protected void preSubmit() throws Exception { - this.textPath = this.createTempFile("text.txt", WordCountData.TEXT); - this.resultPath = this.getTempDirPath("result"); - } + @Test + public void testProgram() throws Exception { + String textPath = createTempFile("text.txt", WordCountData.TEXT); + String resultPath = getTempDirPath("result"); - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath); - } + WordCountLocal.main(new String[]{textPath, resultPath}); - @Override - protected void testProgram() throws Exception { - WordCountLocal.main(new String[] { this.textPath, this.resultPath }); + compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath); } } http://git-wip-us.apache.org/repos/asf/flink/blob/de10b400/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalNamedITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalNamedITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalNamedITCase.java index 445ea37..0353c2c 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalNamedITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalNamedITCase.java @@ -18,31 +18,24 @@ package org.apache.flink.storm.wordcount; -import org.apache.flink.streaming.util.StreamingProgramTestBase; import org.apache.flink.test.testdata.WordCountData; +import org.apache.flink.test.util.AbstractTestBase; + +import org.junit.Test; /** * Test for the WordCountLocalByName example. */ -public class WordCountLocalNamedITCase extends StreamingProgramTestBase { - - protected String textPath; - protected String resultPath; +public class WordCountLocalNamedITCase extends AbstractTestBase { - @Override - protected void preSubmit() throws Exception { - this.textPath = this.createTempFile("text.txt", WordCountData.TEXT); - this.resultPath = this.getTempDirPath("result"); - } + @Test + public void testProgram() throws Exception { + String textPath = createTempFile("text.txt", WordCountData.TEXT); + String resultPath = getTempDirPath("result"); - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath); - } + WordCountLocalByName.main(new String[]{textPath, resultPath}); - @Override - protected void testProgram() throws Exception { - WordCountLocalByName.main(new String[] { this.textPath, this.resultPath }); + compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath); } } http://git-wip-us.apache.org/repos/asf/flink/blob/de10b400/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/scala/examples/windowing/TopSpeedWindowingExampleITCase.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/scala/examples/windowing/TopSpeedWindowingExampleITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/scala/examples/windowing/TopSpeedWindowingExampleITCase.java index db27c60..6a98096 100644 --- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/scala/examples/windowing/TopSpeedWindowingExampleITCase.java +++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/scala/examples/windowing/TopSpeedWindowingExampleITCase.java @@ -19,31 +19,24 @@ package org.apache.flink.streaming.test.scala.examples.windowing; import org.apache.flink.streaming.examples.windowing.util.TopSpeedWindowingExampleData; import org.apache.flink.streaming.scala.examples.windowing.TopSpeedWindowing; -import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.apache.flink.test.util.AbstractTestBase; + +import org.junit.Test; /** * Tests for {@link TopSpeedWindowing}. */ -public class TopSpeedWindowingExampleITCase extends StreamingProgramTestBase { - protected String textPath; - protected String resultPath; - - @Override - protected void preSubmit() throws Exception { - textPath = createTempFile("text.txt", TopSpeedWindowingExampleData.CAR_DATA); - resultPath = getTempDirPath("result"); - } +public class TopSpeedWindowingExampleITCase extends AbstractTestBase { - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(TopSpeedWindowingExampleData.TOP_CASE_CLASS_SPEEDS, resultPath); - } + @Test + public void testProgram() throws Exception { + String textPath = createTempFile("text.txt", TopSpeedWindowingExampleData.CAR_DATA); + String resultPath = getTempDirPath("result"); - @Override - protected void testProgram() throws Exception { TopSpeedWindowing.main(new String[]{ "--input", textPath, "--output", resultPath}); + compareResultsByLinesInMemory(TopSpeedWindowingExampleData.TOP_CASE_CLASS_SPEEDS, resultPath); } } http://git-wip-us.apache.org/repos/asf/flink/blob/de10b400/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java ---------------------------------------------------------------------- diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java index 42fddf5..65e46b5 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java @@ -33,7 +33,7 @@ import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringF import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator; import org.apache.flink.streaming.api.functions.source.FileProcessingMode; import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit; -import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.apache.flink.test.util.AbstractTestBase; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileUtil; @@ -41,6 +41,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Test; import java.io.File; import java.io.IOException; @@ -58,7 +59,7 @@ import static org.junit.Assert.assertEquals; /** * IT cases for the {@link ContinuousFileMonitoringFunction} and {@link ContinuousFileReaderOperator}. */ -public class ContinuousFileProcessingITCase extends StreamingProgramTestBase { +public class ContinuousFileProcessingITCase extends AbstractTestBase { private static final int NO_OF_FILES = 5; private static final int LINES_PER_FILE = 100; @@ -110,8 +111,8 @@ public class ContinuousFileProcessingITCase extends StreamingProgramTestBase { // END OF PREPARATIONS - @Override - protected void testProgram() throws Exception { + @Test + public void testProgram() throws Exception { /* * This test checks the interplay between the monitor and the reader @@ -159,11 +160,6 @@ public class ContinuousFileProcessingITCase extends StreamingProgramTestBase { Throwable th = e; for (int depth = 0; depth < 20; depth++) { if (th instanceof SuccessException) { - try { - postSubmit(); - } catch (Exception e1) { - e1.printStackTrace(); - } return; } else if (th.getCause() != null) { th = th.getCause(); http://git-wip-us.apache.org/repos/asf/flink/blob/de10b400/flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java b/flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java index 9b3ddd3..e03c9ef 100644 --- a/flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java +++ b/flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java @@ -24,9 +24,10 @@ import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.local.LocalFileSystem; import org.apache.flink.runtime.client.JobExecutionException; -import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.util.Preconditions; +import org.junit.Test; import org.python.core.PyException; import java.util.ArrayList; @@ -35,7 +36,7 @@ import java.util.List; /** * Tests for the {@link PythonStreamBinder}. */ -public class PythonStreamBinderTest extends StreamingProgramTestBase { +public class PythonStreamBinderTest extends AbstractTestBase { private static Path getBaseTestPythonDir() { FileSystem fs = new LocalFileSystem(); @@ -60,7 +61,7 @@ public class PythonStreamBinderTest extends StreamingProgramTestBase { return files; } - @Override + @Test public void testProgram() throws Exception { Path testEntryPoint = new Path(getBaseTestPythonDir(), "examples/word_count.py"); List<String> testFiles = findTestFiles(); http://git-wip-us.apache.org/repos/asf/flink/blob/de10b400/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/StatefulFunctionITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/StatefulFunctionITCase.java b/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/StatefulFunctionITCase.java index 2cb87b6..40e8ad9 100644 --- a/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/StatefulFunctionITCase.java +++ b/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/StatefulFunctionITCase.java @@ -18,15 +18,17 @@ package org.apache.flink.streaming.scala.api; import org.apache.flink.streaming.api.scala.StateTestPrograms; -import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.apache.flink.test.util.AbstractTestBase; + +import org.junit.Test; /** * IT case using stateful functions. */ -public class StatefulFunctionITCase extends StreamingProgramTestBase { +public class StatefulFunctionITCase extends AbstractTestBase { - @Override - protected void testProgram() throws Exception { + @Test + public void testProgram() throws Exception { StateTestPrograms.testStatefulFunctions(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/de10b400/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java deleted file mode 100644 index a18be08..0000000 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.util; - -import org.apache.flink.test.util.AbstractTestBase; - -import org.junit.Test; - -/** - * Base class for unit tests that run a single test. - * - * <p>To write a unit test against this test base, simply extend it and implement the {@link #testProgram()} method. - */ -public abstract class StreamingProgramTestBase extends AbstractTestBase { - - // -------------------------------------------------------------------------------------------- - // Methods to create the test program and for pre- and post- test work - // -------------------------------------------------------------------------------------------- - - protected abstract void testProgram() throws Exception; - - protected void preSubmit() throws Exception {} - - protected void postSubmit() throws Exception {} - - // -------------------------------------------------------------------------------------------- - // Test entry point - // -------------------------------------------------------------------------------------------- - - @Test - public void testJob() throws Exception { - // pre-submit - preSubmit(); - - // call the test program - testProgram(); - - // post-submit - postSubmit(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de10b400/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/CsvOutputFormatITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/CsvOutputFormatITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/CsvOutputFormatITCase.java index a95865a..cd85d26 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/CsvOutputFormatITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/CsvOutputFormatITCase.java @@ -20,24 +20,21 @@ package org.apache.flink.test.streaming.api.outputformat; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.util.StreamingProgramTestBase; import org.apache.flink.test.testdata.WordCountData; import org.apache.flink.test.testfunctions.Tokenizer; +import org.apache.flink.test.util.AbstractTestBase; + +import org.junit.Test; /** * Integration tests for {@link org.apache.flink.api.java.io.CsvOutputFormat}. */ -public class CsvOutputFormatITCase extends StreamingProgramTestBase { - - protected String resultPath; +public class CsvOutputFormatITCase extends AbstractTestBase { - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } + @Test + public void testProgram() throws Exception { + String resultPath = getTempDirPath("result"); - @Override - protected void testProgram() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> text = env.fromElements(WordCountData.TEXT); @@ -49,10 +46,7 @@ public class CsvOutputFormatITCase extends StreamingProgramTestBase { counts.writeAsCsv(resultPath); env.execute("WriteAsCsvTest"); - } - @Override - protected void postSubmit() throws Exception { //Strip the parentheses from the expected text like output compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES .replaceAll("[\\\\(\\\\)]", ""), resultPath); http://git-wip-us.apache.org/repos/asf/flink/blob/de10b400/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/TextOutputFormatITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/TextOutputFormatITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/TextOutputFormatITCase.java index 7f0ebc9..d5a5b5c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/TextOutputFormatITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/TextOutputFormatITCase.java @@ -20,24 +20,21 @@ package org.apache.flink.test.streaming.api.outputformat; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.util.StreamingProgramTestBase; import org.apache.flink.test.testdata.WordCountData; import org.apache.flink.test.testfunctions.Tokenizer; +import org.apache.flink.test.util.AbstractTestBase; + +import org.junit.Test; /** * Integration tests for {@link org.apache.flink.api.java.io.TextOutputFormat}. */ -public class TextOutputFormatITCase extends StreamingProgramTestBase { - - protected String resultPath; +public class TextOutputFormatITCase extends AbstractTestBase { - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } + @Test + public void testProgram() throws Exception { + String resultPath = getTempDirPath("result"); - @Override - protected void testProgram() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> text = env.fromElements(WordCountData.TEXT); @@ -49,10 +46,7 @@ public class TextOutputFormatITCase extends StreamingProgramTestBase { counts.writeAsText(resultPath); env.execute("WriteAsTextTest"); - } - @Override - protected void postSubmit() throws Exception { compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath); }
