[FLINK-2275] [tests] Aigrated test from execute() to collect()

 -> for package 'org.apache.flink.test.javaApiOperators'

 Seactivated unstable test (see comment section 
https://issues.apache.org/jira/browse/FLINK-2275)

This closes #866


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a137321a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a137321a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a137321a

Branch: refs/heads/master
Commit: a137321acfce02d62ce48e03b5de37c388152d28
Parents: 4e9e0d6
Author: mjsax <[email protected]>
Authored: Thu Jun 25 01:49:23 2015 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Wed Jul 1 16:58:26 2015 +0200

----------------------------------------------------------------------
 .../streaming/connectors/kafka/KafkaITCase.java |   2 +
 .../test/javaApiOperators/AggregateITCase.java  |  59 +-
 .../test/javaApiOperators/CoGroupITCase.java    |   8 +-
 .../test/javaApiOperators/CrossITCase.java      | 140 +++--
 .../test/javaApiOperators/DataSinkITCase.java   |  38 +-
 .../test/javaApiOperators/DataSourceITCase.java |  22 +-
 .../test/javaApiOperators/DistinctITCase.java   | 100 ++--
 .../test/javaApiOperators/FilterITCase.java     |  83 ++-
 .../test/javaApiOperators/FirstNITCase.java     |  57 +-
 .../test/javaApiOperators/FlatMapITCase.java    |  82 ++-
 .../javaApiOperators/GroupCombineITCase.java    | 204 +++----
 .../javaApiOperators/GroupReduceITCase.java     | 532 ++++++++++---------
 .../flink/test/javaApiOperators/JoinITCase.java | 320 +++++------
 .../flink/test/javaApiOperators/MapITCase.java  |  95 ++--
 .../test/javaApiOperators/PartitionITCase.java  |  75 +--
 .../test/javaApiOperators/ProjectITCase.java    |  25 +-
 .../test/javaApiOperators/ReduceITCase.java     | 141 +++--
 .../ReplicatingDataSourceITCase.java            |  35 +-
 .../javaApiOperators/SortPartitionITCase.java   |  96 ++--
 .../test/javaApiOperators/SumMinMaxITCase.java  |  43 +-
 .../test/javaApiOperators/TypeHintITCase.java   |  65 +--
 .../test/javaApiOperators/UnionITCase.java      |  46 +-
 .../util/CollectionDataSets.java                |  14 +
 23 files changed, 1016 insertions(+), 1266 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index 2af56c1..4b763b2 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -75,6 +75,7 @@ import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
@@ -224,6 +225,7 @@ public class KafkaITCase {
         *
         */
        @Test
+       @Ignore
        public void testPersistentSourceWithOffsetUpdates() throws Exception {
                LOG.info("Starting testPersistentSourceWithOffsetUpdates()");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java
index ea7fc5a..d02f228 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java
@@ -18,17 +18,15 @@
 
 package org.apache.flink.test.javaApiOperators;
 
+import java.util.List;
+
 import org.apache.flink.api.java.aggregation.Aggregations;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.apache.flink.api.java.DataSet;
@@ -38,31 +36,15 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 public class AggregateITCase extends MultipleProgramsTestBase {
 
 
-       public AggregateITCase(TestExecutionMode mode){
+       public AggregateITCase(TestExecutionMode mode) {
                super(mode);
        }
 
-       private String resultPath;
-       private String expected;
-
-       @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       @Before
-       public void before() throws Exception{
-               resultPath = tempFolder.newFile().toURI().toString();
-       }
-
-       @After
-       public void after() throws Exception{
-               compareResultsByLinesInMemory(expected, resultPath);
-       }
-
        @Test
        public void testFullAggregate() throws Exception {
                /*
-                                * Full Aggregate
-                                */
+                * Full Aggregate
+                */
 
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
@@ -70,36 +52,38 @@ public class AggregateITCase extends 
MultipleProgramsTestBase {
                DataSet<Tuple2<Integer, Long>> aggregateDs = ds
                                .aggregate(Aggregations.SUM, 0)
                                .and(Aggregations.MAX, 1)
-                                               .project(0, 1);
+                               .project(0, 1);
+
+               List<Tuple2<Integer, Long>> result = aggregateDs.collect();
 
-               aggregateDs.writeAsCsv(resultPath);
-               env.execute();
+               String expected = "231,6\n";
 
-               expected = "231,6\n";
+               compareResultAsTuples(result, expected);
        }
 
        @Test
        public void testGroupedAggregate() throws Exception {
                /*
-                                * Grouped Aggregate
-                                */
+                * Grouped Aggregate
+                */
 
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
                DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
                DataSet<Tuple2<Long, Integer>> aggregateDs = ds.groupBy(1)
                                .aggregate(Aggregations.SUM, 0)
-                                               .project(1, 0);
+                               .project(1, 0);
 
-               aggregateDs.writeAsCsv(resultPath);
-               env.execute();
+               List<Tuple2<Long, Integer>> result = aggregateDs.collect();
 
-               expected = "1,1\n" +
+               String expected = "1,1\n" +
                                "2,5\n" +
                                "3,15\n" +
                                "4,34\n" +
                                "5,65\n" +
                                "6,111\n";
+
+               compareResultAsTuples(result, expected);
        }
 
        @Test
@@ -114,11 +98,12 @@ public class AggregateITCase extends 
MultipleProgramsTestBase {
                DataSet<Tuple1<Integer>> aggregateDs = ds.groupBy(1)
                                .aggregate(Aggregations.MIN, 0)
                                .aggregate(Aggregations.MIN, 0)
-                                               .project(0);
+                               .project(0);
+
+               List<Tuple1<Integer>> result = aggregateDs.collect();
 
-               aggregateDs.writeAsCsv(resultPath);
-               env.execute();
+               String expected = "1\n";
 
-               expected = "1\n";
+               compareResultAsTuples(result, expected);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
index 7faa6cc..7bc8480 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
@@ -78,8 +78,8 @@ public class CoGroupITCase extends MultipleProgramsTestBase {
        @Test
        public void testCoGroupOnTwoCustomTypeInputsWithKeyExtractors() throws 
Exception {
                /*
-                                * CoGroup on two custom type inputs with key 
extractors
-                                */
+                * CoGroup on two custom type inputs with key extractors
+                */
 
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
@@ -515,7 +515,9 @@ public class CoGroupITCase extends MultipleProgramsTestBase 
{
 
                compareResultAsText(result, expected);
        }
-       
+
+
+
        // 
--------------------------------------------------------------------------------------------
        //  UDF classes
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java
index 74868a0..63d1ec7 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.test.javaApiOperators;
 
 import java.util.Collection;
+import java.util.List;
 
 import org.apache.flink.api.common.functions.CrossFunction;
 import org.apache.flink.api.common.functions.RichCrossFunction;
@@ -30,11 +31,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import 
org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.apache.flink.api.java.DataSet;
@@ -47,22 +44,6 @@ public class CrossITCase extends MultipleProgramsTestBase {
                super(mode);
        }
 
-       private String resultPath;
-       private String expected;
-
-       @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       @Before
-       public void before() throws Exception{
-               resultPath = tempFolder.newFile().toURI().toString();
-       }
-
-       @After
-       public void after() throws Exception{
-               compareResultsByLinesInMemory(expected, resultPath);
-       }
-
        @Test
        public void testCorretnessOfCrossOnTwoTupleInputs() throws Exception {
                /*
@@ -75,10 +56,9 @@ public class CrossITCase extends MultipleProgramsTestBase {
                DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.getSmall5TupleDataSet(env);
                DataSet<Tuple2<Integer, String>> crossDs = 
ds.cross(ds2).with(new Tuple5Cross());
 
-               crossDs.writeAsCsv(resultPath);
-               env.execute();
+               List<Tuple2<Integer, String>> result = crossDs.collect();
 
-               expected = "0,HalloHallo\n" +
+               String expected = "0,HalloHallo\n" +
                                "1,HalloHallo Welt\n" +
                                "2,HalloHallo Welt wie\n" +
                                "1,Hallo WeltHallo\n" +
@@ -87,6 +67,8 @@ public class CrossITCase extends MultipleProgramsTestBase {
                                "2,Hallo Welt wieHallo\n" +
                                "3,Hallo Welt wieHallo Welt\n" +
                                "4,Hallo Welt wieHallo Welt wie\n";
+
+               compareResultAsTuples(result, expected);
        }
 
        @Test
@@ -101,10 +83,9 @@ public class CrossITCase extends MultipleProgramsTestBase {
                DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.getSmall5TupleDataSet(env);
                DataSet<Tuple3<Integer, Long, String>> crossDs = 
ds.cross(ds2).with(new Tuple3ReturnLeft());
 
-               crossDs.writeAsCsv(resultPath);
-               env.execute();
+               List<Tuple3<Integer, Long, String>> result = crossDs.collect();
 
-               expected = "1,1,Hi\n" +
+               String expected = "1,1,Hi\n" +
                                "1,1,Hi\n" +
                                "1,1,Hi\n" +
                                "2,2,Hello\n" +
@@ -113,6 +94,8 @@ public class CrossITCase extends MultipleProgramsTestBase {
                                "3,2,Hello world\n" +
                                "3,2,Hello world\n" +
                                "3,2,Hello world\n";
+
+               compareResultAsTuples(result, expected);
        }
 
        @Test
@@ -127,10 +110,10 @@ public class CrossITCase extends MultipleProgramsTestBase 
{
                DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.getSmall5TupleDataSet(env);
                DataSet<Tuple5<Integer, Long, Integer, String, Long>> crossDs = 
ds.cross(ds2).with(new Tuple5ReturnRight());
 
-               crossDs.writeAsCsv(resultPath);
-               env.execute();
+               List<Tuple5<Integer, Long, Integer, String, Long>> result = 
crossDs
+                               .collect();
 
-               expected = "1,1,0,Hallo,1\n" +
+               String expected = "1,1,0,Hallo,1\n" +
                                "1,1,0,Hallo,1\n" +
                                "1,1,0,Hallo,1\n" +
                                "2,2,1,Hallo Welt,2\n" +
@@ -140,6 +123,7 @@ public class CrossITCase extends MultipleProgramsTestBase {
                                "2,3,2,Hallo Welt wie,1\n" +
                                "2,3,2,Hallo Welt wie,1\n";
 
+               compareResultAsTuples(result, expected);
        }
 
        @Test
@@ -156,10 +140,9 @@ public class CrossITCase extends MultipleProgramsTestBase {
                DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.getSmall5TupleDataSet(env);
                DataSet<Tuple3<Integer, Integer, Integer>> crossDs = 
ds.cross(ds2).with(new Tuple5CrossBC()).withBroadcastSet(intDs, "ints");
 
-               crossDs.writeAsCsv(resultPath);
-               env.execute();
+               List<Tuple3<Integer, Integer, Integer>> result = 
crossDs.collect();
 
-               expected = "2,0,55\n" +
+               String expected = "2,0,55\n" +
                                "3,0,55\n" +
                                "3,0,55\n" +
                                "3,0,55\n" +
@@ -168,6 +151,8 @@ public class CrossITCase extends MultipleProgramsTestBase {
                                "3,0,55\n" +
                                "4,2,55\n" +
                                "4,4,55\n";
+
+               compareResultAsTuples(result, expected);
        }
 
        @Test
@@ -182,10 +167,9 @@ public class CrossITCase extends MultipleProgramsTestBase {
                DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.getSmall5TupleDataSet(env);
                DataSet<Tuple2<Integer, String>> crossDs = 
ds.crossWithHuge(ds2).with(new Tuple5Cross());
 
-               crossDs.writeAsCsv(resultPath);
-               env.execute();
+               List<Tuple2<Integer, String>> result = crossDs.collect();
 
-               expected = "0,HalloHallo\n" +
+               String expected = "0,HalloHallo\n" +
                                "1,HalloHallo Welt\n" +
                                "2,HalloHallo Welt wie\n" +
                                "1,Hallo WeltHallo\n" +
@@ -194,6 +178,8 @@ public class CrossITCase extends MultipleProgramsTestBase {
                                "2,Hallo Welt wieHallo\n" +
                                "3,Hallo Welt wieHallo Welt\n" +
                                "4,Hallo Welt wieHallo Welt wie\n";
+
+               compareResultAsTuples(result, expected);
        }
 
        @Test
@@ -208,10 +194,9 @@ public class CrossITCase extends MultipleProgramsTestBase {
                DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.getSmall5TupleDataSet(env);
                DataSet<Tuple2<Integer, String>> crossDs = 
ds.crossWithTiny(ds2).with(new Tuple5Cross());
 
-               crossDs.writeAsCsv(resultPath);
-               env.execute();
+               List<Tuple2<Integer, String>> result = crossDs.collect();
 
-               expected = "0,HalloHallo\n" +
+               String expected = "0,HalloHallo\n" +
                                "1,HalloHallo Welt\n" +
                                "2,HalloHallo Welt wie\n" +
                                "1,Hallo WeltHallo\n" +
@@ -220,6 +205,8 @@ public class CrossITCase extends MultipleProgramsTestBase {
                                "2,Hallo Welt wieHallo\n" +
                                "3,Hallo Welt wieHallo Welt\n" +
                                "4,Hallo Welt wieHallo Welt wie\n";
+
+               compareResultAsTuples(result, expected);
        }
 
        @Test
@@ -236,12 +223,11 @@ public class CrossITCase extends MultipleProgramsTestBase 
{
                                .projectFirst(2, 1)
                                .projectSecond(3)
                                .projectFirst(0)
-                                       .projectSecond(4,1);
+                               .projectSecond(4,1);
 
-               crossDs.writeAsCsv(resultPath);
-               env.execute();
+               List<Tuple6<String, Long, String, Integer, Long, Long>> result 
= crossDs.collect();
 
-               expected = "Hi,1,Hallo,1,1,1\n" +
+               String expected = "Hi,1,Hallo,1,1,1\n" +
                                "Hi,1,Hallo Welt,1,2,2\n" +
                                "Hi,1,Hallo Welt wie,1,1,3\n" +
                                "Hello,2,Hallo,2,1,1\n" +
@@ -250,6 +236,8 @@ public class CrossITCase extends MultipleProgramsTestBase {
                                "Hello world,2,Hallo,3,1,1\n" +
                                "Hello world,2,Hallo Welt,3,2,2\n" +
                                "Hello world,2,Hallo Welt wie,3,1,3\n";
+
+               compareResultAsTuples(result, expected);
        }
 
        @Test
@@ -266,12 +254,11 @@ public class CrossITCase extends MultipleProgramsTestBase 
{
                                .projectSecond(3)
                                .projectFirst(2, 1)
                                .projectSecond(4,1)
-                                               .projectFirst(0);
+                               .projectFirst(0);
 
-               crossDs.writeAsCsv(resultPath);
-               env.execute();
+               List<Tuple6<String, String, Long, Long, Long, Integer>> result 
= crossDs.collect();
 
-               expected = "Hallo,Hi,1,1,1,1\n" +
+               String expected = "Hallo,Hi,1,1,1,1\n" +
                                "Hallo Welt,Hi,1,2,2,1\n" +
                                "Hallo Welt wie,Hi,1,1,3,1\n" +
                                "Hallo,Hello,2,1,1,2\n" +
@@ -281,6 +268,7 @@ public class CrossITCase extends MultipleProgramsTestBase {
                                "Hallo Welt,Hello world,2,2,2,3\n" +
                                "Hallo Welt wie,Hello world,2,1,3,3\n";
 
+               compareResultAsTuples(result, expected);
        }
 
        @Test
@@ -295,10 +283,10 @@ public class CrossITCase extends MultipleProgramsTestBase 
{
                DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.getSmall5TupleDataSet(env);
                DataSet<Tuple2<Tuple3<Integer, Long, String>, Tuple5<Integer, 
Long, Integer, String, Long>>> crossDs = ds.cross(ds2);
 
-               crossDs.writeAsCsv(resultPath);
-               env.execute();
+               List<Tuple2<Tuple3<Integer, Long, String>, Tuple5<Integer, 
Long, Integer, String, Long>>> result = crossDs.collect();
 
-               expected = "(1,1,Hi),(2,2,1,Hallo Welt,2)\n" +
+               String expected = "(1,1,Hi),(2,2,1,Hallo Welt,2)\n"
+                               +
                                "(1,1,Hi),(1,1,0,Hallo,1)\n" +
                                "(1,1,Hi),(2,3,2,Hallo Welt wie,1)\n" +
                                "(2,2,Hello),(2,2,1,Hallo Welt,2)\n" +
@@ -307,6 +295,8 @@ public class CrossITCase extends MultipleProgramsTestBase {
                                "(3,2,Hello world),(2,2,1,Hallo Welt,2)\n" +
                                "(3,2,Hello world),(1,1,0,Hallo,1)\n" +
                                "(3,2,Hello world),(2,3,2,Hallo Welt wie,1)\n";
+
+               compareResultAsTuples(result, expected);
        }
 
        @Test
@@ -321,10 +311,9 @@ public class CrossITCase extends MultipleProgramsTestBase {
                DataSet<CustomType> ds2 = 
CollectionDataSets.getSmallCustomTypeDataSet(env);
                DataSet<CustomType> crossDs = ds.cross(ds2).with(new 
CustomTypeCross());
 
-               crossDs.writeAsText(resultPath);
-               env.execute();
+               List<CustomType> result = crossDs.collect();
 
-               expected = "1,0,HiHi\n"
+               String expected = "1,0,HiHi\n"
                                + "2,1,HiHello\n"
                                + "2,2,HiHello world\n"
                                + "2,1,HelloHi\n"
@@ -333,6 +322,8 @@ public class CrossITCase extends MultipleProgramsTestBase {
                                + "2,2,Hello worldHi\n"
                                + "4,3,Hello worldHello\n"
                                + "4,4,Hello worldHello world";
+
+               compareResultAsText(result, expected);
        }
 
        @Test
@@ -347,10 +338,9 @@ public class CrossITCase extends MultipleProgramsTestBase {
                DataSet<CustomType> ds2 = 
CollectionDataSets.getSmallCustomTypeDataSet(env);
                DataSet<Tuple3<Integer, Long, String>> crossDs = 
ds.cross(ds2).with(new MixedCross());
 
-               crossDs.writeAsCsv(resultPath);
-               env.execute();
+               List<Tuple3<Integer, Long, String>> result = crossDs.collect();
 
-               expected = "2,0,HalloHi\n" +
+               String expected = "2,0,HalloHi\n" +
                                "3,0,HalloHello\n" +
                                "3,0,HalloHello world\n" +
                                "3,0,Hallo WeltHi\n" +
@@ -359,24 +349,26 @@ public class CrossITCase extends MultipleProgramsTestBase 
{
                                "3,0,Hallo Welt wieHi\n" +
                                "4,2,Hallo Welt wieHello\n" +
                                "4,4,Hallo Welt wieHello world\n";
+
+               compareResultAsTuples(result, expected);
        }
-       
+
        public static class Tuple5Cross implements 
CrossFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, 
Long, Integer, String, Long>, Tuple2<Integer, String>> {
 
                private static final long serialVersionUID = 1L;
 
-               
+
                @Override
                public Tuple2<Integer, String> cross(
                                Tuple5<Integer, Long, Integer, String, Long> 
first,
                                Tuple5<Integer, Long, Integer, String, Long> 
second)
                                throws Exception {
-                       
+
                                return new Tuple2<Integer, 
String>(first.f2+second.f2, first.f3+second.f3);
                }
 
        }
-       
+
        public static class CustomTypeCross implements 
CrossFunction<CustomType, CustomType, CustomType> {
 
                private static final long serialVersionUID = 1L;
@@ -384,12 +376,12 @@ public class CrossITCase extends MultipleProgramsTestBase 
{
                @Override
                public CustomType cross(CustomType first, CustomType second)
                                throws Exception {
-                       
+
                        return new CustomType(first.myInt * second.myInt, 
first.myLong + second.myLong, first.myString + second.myString);
                }
-               
+
        }
-       
+
        public static class MixedCross implements CrossFunction<Tuple5<Integer, 
Long, Integer, String, Long>, CustomType, Tuple3<Integer, Long, String>> {
 
                private static final long serialVersionUID = 1L;
@@ -401,12 +393,12 @@ public class CrossITCase extends MultipleProgramsTestBase 
{
 
                        return new Tuple3<Integer, Long, String>(first.f0 + 
second.myInt, first.f2 * second.myLong, first.f3 + second.myString);
                }
-               
+
        }
-       
-       
+
+
        public static class Tuple3ReturnLeft implements 
CrossFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, 
String, Long>, Tuple3<Integer, Long, String>> {
-               
+
                private static final long serialVersionUID = 1L;
 
                @Override
@@ -417,9 +409,9 @@ public class CrossITCase extends MultipleProgramsTestBase {
                        return first;
                }
        }
-       
+
        public static class Tuple5ReturnRight implements 
CrossFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, 
String, Long>, Tuple5<Integer, Long, Integer, String, Long>> {
-               
+
                private static final long serialVersionUID = 1L;
 
                @Override
@@ -427,29 +419,29 @@ public class CrossITCase extends MultipleProgramsTestBase 
{
                                Tuple3<Integer, Long, String> first,
                                Tuple5<Integer, Long, Integer, String, Long> 
second)
                                throws Exception {
-                       
+
                        return second;
                }
 
 
        }
-       
+
        public static class Tuple5CrossBC extends 
RichCrossFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, 
Long, Integer, String, Long>, Tuple3<Integer, Integer, Integer>> {
 
                private static final long serialVersionUID = 1L;
-               
+
                private int broadcast = 42;
-               
+
                @Override
                public void open(Configuration config) {
-                       
+
                        Collection<Integer> ints = 
this.getRuntimeContext().getBroadcastVariable("ints");
                        int sum = 0;
                        for(Integer i : ints) {
                                sum += i;
                        }
                        broadcast = sum;
-                       
+
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java
index 5dc3867..b49bd33 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java
@@ -50,7 +50,6 @@ public class DataSinkITCase extends MultipleProgramsTestBase {
        }
 
        private String resultPath;
-       private String expected;
 
        @Rule
        public TemporaryFolder tempFolder = new TemporaryFolder();
@@ -62,7 +61,6 @@ public class DataSinkITCase extends MultipleProgramsTestBase {
 
        @Test
        public void testIntSortingParallelism1() throws Exception {
-
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
                DataSet<Integer> ds = CollectionDataSets.getIntegerDataSet(env);
@@ -70,14 +68,13 @@ public class DataSinkITCase extends 
MultipleProgramsTestBase {
 
                env.execute();
 
-               expected = "5\n5\n5\n5\n5\n4\n4\n4\n4\n3\n3\n3\n2\n2\n1\n";
+               String expected = 
"5\n5\n5\n5\n5\n4\n4\n4\n4\n3\n3\n3\n2\n2\n1\n";
                compareResultsByLinesInMemoryWithStrictOrder(expected, 
resultPath);
 
        }
 
        @Test
        public void testStringSortingParallelism1() throws Exception {
-
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
                DataSet<String> ds = CollectionDataSets.getStringDataSet(env);
@@ -85,7 +82,7 @@ public class DataSinkITCase extends MultipleProgramsTestBase {
 
                env.execute();
 
-               expected = "Hello\n" +
+               String expected = "Hello\n" +
                                "Hello world\n" +
                                "Hello world, how are you?\n" +
                                "Hi\n" +
@@ -99,7 +96,6 @@ public class DataSinkITCase extends MultipleProgramsTestBase {
 
        @Test
        public void testTupleSortingSingleAscParallelism1() throws Exception {
-
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
                DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
@@ -107,7 +103,7 @@ public class DataSinkITCase extends 
MultipleProgramsTestBase {
 
                env.execute();
 
-               expected = "1,1,Hi\n" +
+               String expected = "1,1,Hi\n" +
                                "2,2,Hello\n" +
                                "3,2,Hello world\n" +
                                "4,3,Hello world, how are you?\n" +
@@ -130,12 +126,10 @@ public class DataSinkITCase extends 
MultipleProgramsTestBase {
                                "21,6,Comment#15\n";
 
                compareResultsByLinesInMemoryWithStrictOrder(expected, 
resultPath);
-
        }
 
        @Test
        public void testTupleSortingSingleDescParallelism1() throws Exception {
-
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
                DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
@@ -143,7 +137,7 @@ public class DataSinkITCase extends 
MultipleProgramsTestBase {
 
                env.execute();
 
-               expected = "21,6,Comment#15\n" +
+               String expected = "21,6,Comment#15\n" +
                                "20,6,Comment#14\n" +
                                "19,6,Comment#13\n" +
                                "18,6,Comment#12\n" +
@@ -164,13 +158,12 @@ public class DataSinkITCase extends 
MultipleProgramsTestBase {
                                "3,2,Hello world\n" +
                                "2,2,Hello\n" +
                                "1,1,Hi\n";
-               compareResultsByLinesInMemoryWithStrictOrder(expected, 
resultPath);
 
+               compareResultsByLinesInMemoryWithStrictOrder(expected, 
resultPath);
        }
 
        @Test
        public void testTupleSortingDualParallelism1() throws Exception {
-
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
                DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
@@ -178,7 +171,7 @@ public class DataSinkITCase extends 
MultipleProgramsTestBase {
 
                env.execute();
 
-               expected = "16,6,Comment#10\n" +
+               String expected = "16,6,Comment#10\n" +
                                "17,6,Comment#11\n" +
                                "18,6,Comment#12\n" +
                                "19,6,Comment#13\n" +
@@ -199,13 +192,12 @@ public class DataSinkITCase extends 
MultipleProgramsTestBase {
                                "2,2,Hello\n" +
                                "3,2,Hello world\n" +
                                "1,1,Hi\n";
-               compareResultsByLinesInMemoryWithStrictOrder(expected, 
resultPath);
 
+               compareResultsByLinesInMemoryWithStrictOrder(expected, 
resultPath);
        }
 
        @Test
        public void testTupleSortingNestedParallelism1() throws Exception {
-
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
                DataSet<Tuple3<Tuple2<Integer, Integer>, String, Integer>> ds =
@@ -217,7 +209,7 @@ public class DataSinkITCase extends 
MultipleProgramsTestBase {
 
                env.execute();
 
-               expected =
+               String expected =
                                "((2,1),a,3)\n" +
                                "((2,2),b,4)\n" +
                                "((1,2),a,1)\n" +
@@ -231,7 +223,6 @@ public class DataSinkITCase extends 
MultipleProgramsTestBase {
 
        @Test
        public void testTupleSortingNestedParallelism1_2() throws Exception {
-
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
                DataSet<Tuple3<Tuple2<Integer, Integer>, String, Integer>> ds =
@@ -243,7 +234,7 @@ public class DataSinkITCase extends 
MultipleProgramsTestBase {
 
                env.execute();
 
-               expected =
+               String expected =
                                "((2,1),a,3)\n" +
                                "((1,3),a,2)\n" +
                                "((1,2),a,1)\n" +
@@ -257,7 +248,6 @@ public class DataSinkITCase extends 
MultipleProgramsTestBase {
 
        @Test
        public void testPojoSortingSingleParallelism1() throws Exception {
-
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
                DataSet<CollectionDataSets.POJO> ds = 
CollectionDataSets.getMixedPojoDataSet(env);
@@ -265,7 +255,7 @@ public class DataSinkITCase extends 
MultipleProgramsTestBase {
 
                env.execute();
 
-               expected = "1 First (10,100,1000,One) 10100\n" +
+               String expected = "1 First (10,100,1000,One) 10100\n" +
                                "2 First_ (10,105,1000,One) 10200\n" +
                                "3 First (11,102,3000,One) 10200\n" +
                                "4 First_ (11,106,1000,One) 10300\n" +
@@ -275,12 +265,10 @@ public class DataSinkITCase extends 
MultipleProgramsTestBase {
                                "8 Third_ (30,300,1000,Three) 10100\n";
 
                compareResultsByLinesInMemoryWithStrictOrder(expected, 
resultPath);
-
        }
 
        @Test
        public void testPojoSortingDualParallelism1() throws Exception {
-
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
                DataSet<CollectionDataSets.POJO> ds = 
CollectionDataSets.getMixedPojoDataSet(env);
@@ -291,7 +279,7 @@ public class DataSinkITCase extends 
MultipleProgramsTestBase {
 
                env.execute();
 
-               expected =
+               String expected =
                                "5 First (11,102,2000,One) 10100\n" +
                                "3 First (11,102,3000,One) 10200\n" +
                                "1 First (10,100,1000,One) 10100\n" +
@@ -307,7 +295,6 @@ public class DataSinkITCase extends 
MultipleProgramsTestBase {
 
        @Test
        public void testPojoSortingNestedParallelism1() throws Exception {
-
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
                DataSet<CollectionDataSets.POJO> ds = 
CollectionDataSets.getMixedPojoDataSet(env);
@@ -319,7 +306,7 @@ public class DataSinkITCase extends 
MultipleProgramsTestBase {
 
                env.execute();
 
-               expected =
+               String expected =
                                "2 First_ (10,105,1000,One) 10200\n" +
                                "1 First (10,100,1000,One) 10100\n" +
                                "4 First_ (11,106,1000,One) 10300\n" +
@@ -334,7 +321,6 @@ public class DataSinkITCase extends 
MultipleProgramsTestBase {
 
        @Test
        public void testSortingParallelism4() throws Exception {
-
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
                DataSet<Long> ds = env.generateSequence(0, 1000);

http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSourceITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSourceITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSourceITCase.java
index 9755caa..aa40754 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSourceITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSourceITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.test.javaApiOperators;
 
+import java.util.List;
 
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -30,44 +31,35 @@ import org.junit.Assert;
 /**
  * Tests for the DataSource
  */
-
 public class DataSourceITCase extends JavaProgramTestBase {
 
-       private String resultPath;
        private String inputPath;
-       private String expectedResult;
-
 
        @Override
        protected void preSubmit() throws Exception {
                inputPath = createTempFile("input", "ab\n"
                                + "cd\n"
                                + "ef\n");
-               resultPath = getTempDirPath("result");
        }
 
        @Override
        protected void testProgram() throws Exception {
                /*
-                                * Test passing a configuration object to an 
input format
-                                */
+                * Test passing a configuration object to an input format
+                */
 
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                Configuration ifConf = new Configuration();
                ifConf.setString("prepend", "test");
 
                DataSet<String> ds = env.createInput(new TestInputFormat(new 
Path(inputPath))).withParameters(ifConf);
-               ds.writeAsText(resultPath);
-               env.execute();
+               List<String> result = ds.collect();
 
-               expectedResult= "ab\n"
+               String expectedResult = "ab\n"
                                + "cd\n"
                                + "ef\n";
-       }
-       
-       @Override
-       protected void postSubmit() throws Exception {
-               compareResultsByLinesInMemory(expectedResult, resultPath);
+
+               compareResultAsText(result, expectedResult);
        }
 
        private static class TestInputFormat extends TextInputFormat {

http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
index 4568ab6..02dbb76 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.test.javaApiOperators;
 
+import java.util.List;
 
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -32,11 +33,7 @@ import 
org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import 
org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -48,22 +45,6 @@ public class DistinctITCase extends MultipleProgramsTestBase 
{
                super(mode);
        }
 
-       private String resultPath;
-       private String expected;
-
-       @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       @Before
-       public void before() throws Exception{
-               resultPath = tempFolder.newFile().toURI().toString();
-       }
-
-       @After
-       public void after() throws Exception{
-               compareResultsByLinesInMemory(expected, resultPath);
-       }
-
        @Test
        public void testCorrectnessOfDistinctOnTuplesWithKeyFieldSelector() 
throws Exception {
                /*
@@ -75,17 +56,17 @@ public class DistinctITCase extends 
MultipleProgramsTestBase {
                DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.getSmall3TupleDataSet(env);
                DataSet<Tuple3<Integer, Long, String>> distinctDs = 
ds.union(ds).distinct(0, 1, 2);
 
-               distinctDs.writeAsCsv(resultPath);
-               env.execute();
+               List<Tuple3<Integer, Long, String>> result = 
distinctDs.collect();
 
-               expected = "1,1,Hi\n" +
+               String expected = "1,1,Hi\n" +
                                "2,2,Hello\n" +
                                "3,2,Hello world\n";
+
+               compareResultAsTuples(result, expected);
        }
 
        @Test
-       public void 
testCorrectnessOfDistinctOnTuplesWithKeyFieldSelectorWithNotAllFieldsSelected()
-       throws Exception{
+       public void 
testCorrectnessOfDistinctOnTuplesWithKeyFieldSelectorWithNotAllFieldsSelected() 
throws Exception{
                /*
                 * check correctness of distinct on tuples with key field 
selector with not all fields selected
                 */
@@ -93,13 +74,14 @@ public class DistinctITCase extends 
MultipleProgramsTestBase {
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
                DataSet<Tuple5<Integer, Long,  Integer, String, Long>> ds = 
CollectionDataSets.getSmall5TupleDataSet(env);
-                               DataSet<Tuple1<Integer>> distinctDs = 
ds.union(ds).distinct(0).project(0);
+               DataSet<Tuple1<Integer>> distinctDs = 
ds.union(ds).distinct(0).project(0);
 
-               distinctDs.writeAsCsv(resultPath);
-               env.execute();
+               List<Tuple1<Integer>> result = distinctDs.collect();
 
-               expected = "1\n" +
+               String expected = "1\n" +
                                "2\n";
+
+               compareResultAsTuples(result, expected);
        }
 
        @Test
@@ -111,15 +93,13 @@ public class DistinctITCase extends 
MultipleProgramsTestBase {
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
                DataSet<Tuple5<Integer, Long,  Integer, String, Long>> ds = 
CollectionDataSets.getSmall5TupleDataSet(env);
-               DataSet<Tuple1<Integer>> reduceDs = ds.union(ds)
-                               .distinct(new KeySelector1()).project(0);
+               DataSet<Tuple1<Integer>> reduceDs = ds.union(ds).distinct(new 
KeySelector1()).project(0);
 
+               List<Tuple1<Integer>> result = reduceDs.collect();
 
-               reduceDs.writeAsCsv(resultPath);
-               env.execute();
+               String expected = "1\n" + "2\n";
 
-               expected = "1\n" +
-                               "2\n";
+               compareResultAsTuples(result, expected);
        }
 
        public static class KeySelector1 implements KeySelector<Tuple5<Integer, 
Long,  Integer, String, Long>, Integer> {
@@ -143,15 +123,16 @@ public class DistinctITCase extends 
MultipleProgramsTestBase {
                                .distinct(new KeySelector3())
                                .map(new Mapper3());
 
-               reduceDs.writeAsCsv(resultPath);
-               env.execute();
+               List<Tuple1<Integer>> result = reduceDs.collect();
 
-               expected = "1\n" +
+               String expected = "1\n" +
                                "2\n" +
                                "3\n" +
                                "4\n" +
                                "5\n" +
                                "6\n";
+
+               compareResultAsTuples(result, expected);
        }
 
        public static class Mapper3 extends RichMapFunction<CustomType, 
Tuple1<Integer>> {
@@ -180,17 +161,17 @@ public class DistinctITCase extends 
MultipleProgramsTestBase {
                DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.getSmall3TupleDataSet(env);
                DataSet<Tuple3<Integer, Long, String>> distinctDs = 
ds.union(ds).distinct();
 
-               distinctDs.writeAsCsv(resultPath);
-               env.execute();
+               List<Tuple3<Integer, Long, String>> result = 
distinctDs.collect();
 
-               expected = "1,1,Hi\n" +
+               String expected = "1,1,Hi\n" +
                                "2,2,Hello\n" +
                                "3,2,Hello world\n";
+
+               compareResultAsTuples(result, expected);
        }
 
        @Test
-       public void 
testCorrectnessOfDistinctOnCustomTypeWithTupleReturningTypeExtractor() throws
-                       Exception{
+       public void 
testCorrectnessOfDistinctOnCustomTypeWithTupleReturningTypeExtractor() throws 
Exception{
                /*
                 * check correctness of distinct on custom type with 
tuple-returning type extractor
                 */
@@ -200,12 +181,11 @@ public class DistinctITCase extends 
MultipleProgramsTestBase {
                DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = 
CollectionDataSets.get5TupleDataSet(env);
                DataSet<Tuple2<Integer, Long>> reduceDs = ds
                                .distinct(new KeySelector2())
-                                               .project(0,4);
+                               .project(0,4);
 
-               reduceDs.writeAsCsv(resultPath);
-               env.execute();
+               List<Tuple2<Integer, Long>> result = reduceDs.collect();
 
-               expected = "1,1\n" +
+               String expected = "1,1\n" +
                                "2,1\n" +
                                "2,2\n" +
                                "3,2\n" +
@@ -215,6 +195,8 @@ public class DistinctITCase extends 
MultipleProgramsTestBase {
                                "5,1\n" +
                                "5,2\n" +
                                "5,3\n";
+
+               compareResultAsTuples(result, expected);
        }
 
        public static class KeySelector2 implements KeySelector<Tuple5<Integer, 
Long, Integer, String, Long>, Tuple2<Integer, Long>> {
@@ -234,14 +216,14 @@ public class DistinctITCase extends 
MultipleProgramsTestBase {
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
                DataSet<Tuple5<Integer, Long,  Integer, String, Long>> ds = 
CollectionDataSets.getSmall5TupleDataSet(env);
-               DataSet<Tuple1<Integer>> reduceDs = ds.union(ds)
-                                               .distinct("f0").project(0);
+               DataSet<Tuple1<Integer>> reduceDs = 
ds.union(ds).distinct("f0").project(0);
 
-               reduceDs.writeAsCsv(resultPath);
-               env.execute();
+               List<Tuple1<Integer>> result = reduceDs.collect();
 
-               expected = "1\n" +
+               String expected = "1\n" +
                                "2\n";
+
+               compareResultAsTuples(result, expected);
        }
 
        @Test
@@ -255,10 +237,11 @@ public class DistinctITCase extends 
MultipleProgramsTestBase {
                DataSet<POJO> ds = 
CollectionDataSets.getDuplicatePojoDataSet(env);
                DataSet<Integer> reduceDs = 
ds.distinct("nestedPojo.longNumber").map(new Mapper2());
 
-               reduceDs.writeAsText(resultPath);
-               env.execute();
+               List<Integer> result = reduceDs.collect();
+
+               String expected = "10000\n20000\n30000\n";
 
-               expected = "10000\n20000\n30000\n";
+               compareResultAsText(result, expected);
        }
 
        public static class Mapper2 implements 
MapFunction<CollectionDataSets.POJO, Integer> {
@@ -278,10 +261,11 @@ public class DistinctITCase extends 
MultipleProgramsTestBase {
                DataSet<POJO> ds = 
CollectionDataSets.getDuplicatePojoDataSet(env);
                DataSet<Integer> reduceDs = ds.distinct().map(new Mapper1());
 
-               reduceDs.writeAsText(resultPath);
-               env.execute();
+               List<Integer> result = reduceDs.collect();
+
+               String expected = "10000\n20000\n30000\n";
 
-               expected = "10000\n20000\n30000\n";
+               compareResultAsText(result, expected);
        }
 
        public static class Mapper1 implements 
MapFunction<CollectionDataSets.POJO, Integer> {

http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FilterITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FilterITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FilterITCase.java
index c46bc46..993b137 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FilterITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FilterITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.test.javaApiOperators;
 
 import java.util.Collection;
+import java.util.List;
 
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.RichFilterFunction;
@@ -27,11 +28,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import 
org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.apache.flink.api.java.DataSet;
@@ -43,22 +40,6 @@ public class FilterITCase extends MultipleProgramsTestBase {
                super(mode);
        }
 
-       private String resultPath;
-       private String expected;
-
-       @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       @Before
-       public void before() throws Exception{
-               resultPath = tempFolder.newFile().toURI().toString();
-       }
-
-       @After
-       public void after() throws Exception{
-               compareResultsByLinesInMemory(expected, resultPath);
-       }
-
        @Test
        public void testAllRejectingFilter() throws Exception {
                /*
@@ -71,10 +52,11 @@ public class FilterITCase extends MultipleProgramsTestBase {
                DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
                                filter(new Filter1());
 
-               filterDs.writeAsCsv(resultPath);
-               env.execute();
+               List<Tuple3<Integer, Long, String>> result = filterDs.collect();
 
-               expected = "\n";
+               String expected = "\n";
+
+               compareResultAsTuples(result, expected);
        }
 
        public static class Filter1 implements 
FilterFunction<Tuple3<Integer,Long,String>> {
@@ -97,10 +79,9 @@ public class FilterITCase extends MultipleProgramsTestBase {
                DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
                DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
                                filter(new Filter2());
-               filterDs.writeAsCsv(resultPath);
-               env.execute();
+               List<Tuple3<Integer, Long, String>> result = filterDs.collect();
 
-               expected = "1,1,Hi\n" +
+               String expected = "1,1,Hi\n" +
                                "2,2,Hello\n" +
                                "3,2,Hello world\n" +
                                "4,3,Hello world, how are you?\n" +
@@ -121,6 +102,8 @@ public class FilterITCase extends MultipleProgramsTestBase {
                                "19,6,Comment#13\n" +
                                "20,6,Comment#14\n" +
                                "21,6,Comment#15\n";
+
+               compareResultAsTuples(result, expected);
        }
 
        public static class Filter2 implements 
FilterFunction<Tuple3<Integer,Long,String>> {
@@ -143,12 +126,14 @@ public class FilterITCase extends 
MultipleProgramsTestBase {
                DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
                DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
                                filter(new Filter3());
-               filterDs.writeAsCsv(resultPath);
-               env.execute();
+               List<Tuple3<Integer, Long, String>> result = filterDs.collect();
 
-               expected = "3,2,Hello world\n" +
+               String expected = "3,2,Hello world\n"
+                               +
                                "4,3,Hello world, how are you?\n";
 
+               compareResultAsTuples(result, expected);
+
        }
 
        public static class Filter3 implements 
FilterFunction<Tuple3<Integer,Long,String>> {
@@ -163,18 +148,17 @@ public class FilterITCase extends 
MultipleProgramsTestBase {
        @Test
        public void testFilterOnIntegerTupleField() throws Exception {
                /*
-                                * Test filter on Integer tuple field.
-                                */
+                * Test filter on Integer tuple field.
+                */
 
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
                DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
                DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
                                filter(new Filter4());
-               filterDs.writeAsCsv(resultPath);
-               env.execute();
+               List<Tuple3<Integer, Long, String>> result = filterDs.collect();
 
-               expected = "2,2,Hello\n" +
+               String expected = "2,2,Hello\n" +
                                "4,3,Hello world, how are you?\n" +
                                "6,3,Luke Skywalker\n" +
                                "8,4,Comment#2\n" +
@@ -184,6 +168,8 @@ public class FilterITCase extends MultipleProgramsTestBase {
                                "16,6,Comment#10\n" +
                                "18,6,Comment#12\n" +
                                "20,6,Comment#14\n";
+
+               compareResultAsTuples(result, expected);
        }
 
        public static class Filter4 implements 
FilterFunction<Tuple3<Integer,Long,String>> {
@@ -206,13 +192,14 @@ public class FilterITCase extends 
MultipleProgramsTestBase {
                DataSet<String> ds = CollectionDataSets.getStringDataSet(env);
                DataSet<String> filterDs = ds.
                                filter(new Filter5());
-               filterDs.writeAsText(resultPath);
-               env.execute();
+               List<String> result = filterDs.collect();
 
-               expected = "Hi\n" +
+               String expected = "Hi\n" +
                                "Hello\n" +
                                "Hello world\n" +
                                "Hello world, how are you?\n";
+
+               compareResultAsText(result, expected);
        }
 
        public static class Filter5 implements FilterFunction<String> {
@@ -235,12 +222,14 @@ public class FilterITCase extends 
MultipleProgramsTestBase {
                DataSet<CustomType> ds = 
CollectionDataSets.getCustomTypeDataSet(env);
                DataSet<CustomType> filterDs = ds.
                                filter(new Filter6());
-               filterDs.writeAsText(resultPath);
-               env.execute();
+               List<CustomType> result = filterDs.collect();
 
-               expected = "3,3,Hello world, how are you?\n" +
+               String expected = "3,3,Hello world, how are you?\n"
+                               +
                                "3,4,I am fine.\n" +
                                "3,5,Luke Skywalker\n";
+
+               compareResultAsText(result, expected);
        }
 
        public static class Filter6 implements FilterFunction<CustomType> {
@@ -265,13 +254,14 @@ public class FilterITCase extends 
MultipleProgramsTestBase {
                DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
                DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
                                filter(new 
RichFilter1()).withBroadcastSet(ints, "ints");
-               filterDs.writeAsCsv(resultPath);
-               env.execute();
+               List<Tuple3<Integer, Long, String>> result = filterDs.collect();
 
-               expected = "1,1,Hi\n" +
+               String expected = "1,1,Hi\n" +
                                "2,2,Hello\n" +
                                "3,2,Hello world\n" +
                                "4,3,Hello world, how are you?\n";
+
+               compareResultAsTuples(result, expected);
        }
 
        public static class RichFilter1 extends 
RichFilterFunction<Tuple3<Integer,Long,String>> {
@@ -306,14 +296,15 @@ public class FilterITCase extends 
MultipleProgramsTestBase {
                DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
                DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
                                filter(new 
RichFilter2()).withBroadcastSet(intDs, "ints");
-               filterDs.writeAsCsv(resultPath);
-               env.execute();
+               List<Tuple3<Integer, Long, String>> result = filterDs.collect();
 
-               expected = "11,5,Comment#5\n" +
+               String expected = "11,5,Comment#5\n" +
                                "12,5,Comment#6\n" +
                                "13,5,Comment#7\n" +
                                "14,5,Comment#8\n" +
                                "15,5,Comment#9\n";
+
+               compareResultAsTuples(result, expected);
        }
 
        public static class RichFilter2 extends 
RichFilterFunction<Tuple3<Integer,Long,String>> {

http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java
index 15d98dd..3eb870d 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.test.javaApiOperators;
 
+import java.util.List;
+
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.java.DataSet;
@@ -29,11 +31,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -43,22 +41,6 @@ public class FirstNITCase extends MultipleProgramsTestBase {
                super(mode);
        }
 
-       private String resultPath;
-       private String expected;
-
-       @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       @Before
-       public void before() throws Exception{
-               resultPath = tempFolder.newFile().toURI().toString();
-       }
-
-       @After
-       public void after() throws Exception{
-               compareResultsByLinesInMemory(expected, resultPath);
-       }
-
        @Test
        public void testFirstNOnUngroupedDS() throws Exception {
                /*
@@ -70,10 +52,11 @@ public class FirstNITCase extends MultipleProgramsTestBase {
                DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
                DataSet<Tuple1<Integer>> seven = ds.first(7).map(new 
OneMapper()).sum(0);
 
-               seven.writeAsText(resultPath);
-               env.execute();
+               List<Tuple1<Integer>> result = seven.collect();
 
-               expected = "(7)\n";
+               String expected = "(7)\n";
+
+               compareResultAsText(result, expected);
        }
 
        @Test
@@ -88,10 +71,11 @@ public class FirstNITCase extends MultipleProgramsTestBase {
                DataSet<Tuple2<Long, Integer>> first = ds.groupBy(1).first(4)
                                .map(new OneMapper2()).groupBy(0).sum(1);
 
-               first.writeAsText(resultPath);
-               env.execute();
+               List<Tuple2<Long, Integer>> result = first.collect();
+
+               String expected = "(1,1)\n(2,2)\n(3,3)\n(4,4)\n(5,4)\n(6,4)\n";
 
-               expected = "(1,1)\n(2,2)\n(3,3)\n(4,4)\n(5,4)\n(6,4)\n";
+               compareResultAsText(result, expected);
        }
 
        @Test
@@ -104,17 +88,18 @@ public class FirstNITCase extends MultipleProgramsTestBase 
{
 
                DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
                DataSet<Tuple2<Long, Integer>> first = 
ds.groupBy(1).sortGroup(0, Order.DESCENDING).first(3)
-                                                                               
                                        .project(1,0);
+                               .project(1,0);
 
-               first.writeAsText(resultPath);
-               env.execute();
+               List<Tuple2<Long, Integer>> result = first.collect();
 
-               expected = "(1,1)\n"
+               String expected = "(1,1)\n"
                                + "(2,3)\n(2,2)\n"
                                + "(3,6)\n(3,5)\n(3,4)\n"
                                + "(4,10)\n(4,9)\n(4,8)\n"
                                + "(5,15)\n(5,14)\n(5,13)\n"
                                + "(6,21)\n(6,20)\n(6,19)\n";
+
+               compareResultAsText(result, expected);
        }
 
        /**
@@ -137,13 +122,13 @@ public class FirstNITCase extends 
MultipleProgramsTestBase {
                        }
                }, Order.DESCENDING).first(1);
 
-               b.writeAsText(resultPath);
-               ee.execute();
+               List<String> result = b.collect();
 
-               expected = "a\nb";
+               String expected = "a\nb";
 
+               compareResultAsText(result, expected);
        }
-       
+
        public static class OneMapper implements MapFunction<Tuple3<Integer, 
Long, String>, Tuple1<Integer>> {
                private static final long serialVersionUID = 1L;
                private final Tuple1<Integer> one = new Tuple1<Integer>(1);
@@ -152,7 +137,7 @@ public class FirstNITCase extends MultipleProgramsTestBase {
                        return one;
                }
        }
-       
+
        public static class OneMapper2 implements MapFunction<Tuple3<Integer, 
Long, String>, Tuple2<Long, Integer>> {
                private static final long serialVersionUID = 1L;
                private final Tuple2<Long, Integer> one = new Tuple2<Long, 
Integer>(0l,1);
@@ -162,5 +147,5 @@ public class FirstNITCase extends MultipleProgramsTestBase {
                        return one;
                }
        }
-       
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FlatMapITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FlatMapITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FlatMapITCase.java
index 37cf1fc..4962da8 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FlatMapITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FlatMapITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.test.javaApiOperators;
 
 import java.util.Collection;
+import java.util.List;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
@@ -28,11 +29,7 @@ import 
org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import 
org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.apache.flink.api.java.DataSet;
@@ -44,22 +41,6 @@ public class FlatMapITCase extends MultipleProgramsTestBase {
                super(mode);
        }
 
-       private String resultPath;
-       private String expected;
-
-       @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       @Before
-       public void before() throws Exception{
-               resultPath = tempFolder.newFile().toURI().toString();
-       }
-
-       @After
-       public void after() throws Exception{
-               compareResultsByLinesInMemory(expected, resultPath);
-       }
-
        @Test
        public void testNonPassingFlatMap() throws Exception {
                /*
@@ -72,10 +53,11 @@ public class FlatMapITCase extends MultipleProgramsTestBase 
{
                DataSet<String> nonPassingFlatMapDs = ds.
                                flatMap(new FlatMapper1());
 
-               nonPassingFlatMapDs.writeAsText(resultPath);
-               env.execute();
+               List<String> result = nonPassingFlatMapDs.collect();
 
-               expected = "\n";
+               String expected = "\n";
+
+               compareResultAsText(result, expected);
        }
 
        public static class FlatMapper1 implements FlatMapFunction<String, 
String> {
@@ -101,10 +83,9 @@ public class FlatMapITCase extends MultipleProgramsTestBase 
{
                DataSet<String> duplicatingFlatMapDs = ds.
                                flatMap(new FlatMapper2());
 
-               duplicatingFlatMapDs.writeAsText(resultPath);
-               env.execute();
+               List<String> result = duplicatingFlatMapDs.collect();
 
-               expected =      "Hi\n" + "HI\n" +
+               String expected = "Hi\n" + "HI\n" +
                                "Hello\n" + "HELLO\n" +
                                "Hello world\n" + "HELLO WORLD\n" +
                                "Hello world, how are you?\n" + "HELLO WORLD, 
HOW ARE YOU?\n" +
@@ -112,6 +93,8 @@ public class FlatMapITCase extends MultipleProgramsTestBase {
                                "Luke Skywalker\n" + "LUKE SKYWALKER\n" +
                                "Random comment\n" + "RANDOM COMMENT\n" +
                                "LOL\n" + "LOL\n";
+
+               compareResultAsText(result, expected);
        }
 
        public static class FlatMapper2 implements FlatMapFunction<String, 
String> {
@@ -136,10 +119,9 @@ public class FlatMapITCase extends 
MultipleProgramsTestBase {
                DataSet<Tuple3<Integer, Long, String>> varyingTuplesMapDs = ds.
                                flatMap(new FlatMapper3());
 
-               varyingTuplesMapDs.writeAsCsv(resultPath);
-               env.execute();
+               List<Tuple3<Integer, Long, String>> result = 
varyingTuplesMapDs.collect();
 
-               expected =  "1,1,Hi\n" +
+               String expected = "1,1,Hi\n" +
                                "2,2,Hello\n" + "2,2,Hello\n" +
                                "4,3,Hello world, how are you?\n" +
                                "5,3,I am fine.\n" + "5,3,I am fine.\n" +
@@ -153,6 +135,8 @@ public class FlatMapITCase extends MultipleProgramsTestBase 
{
                                "17,6,Comment#11\n" + "17,6,Comment#11\n" +
                                "19,6,Comment#13\n" +
                                "20,6,Comment#14\n" + "20,6,Comment#14\n";
+
+               compareResultAsTuples(result, expected);
        }
 
        public static class FlatMapper3 implements 
FlatMapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
@@ -180,10 +164,9 @@ public class FlatMapITCase extends 
MultipleProgramsTestBase {
                DataSet<Tuple3<Integer, Long, String>> typeConversionFlatMapDs 
= ds.
                                flatMap(new FlatMapper4());
 
-               typeConversionFlatMapDs.writeAsCsv(resultPath);
-               env.execute();
+               List<Tuple3<Integer, Long, String>> result = 
typeConversionFlatMapDs.collect();
 
-               expected =      "1,0,Hi\n" +
+               String expected = "1,0,Hi\n" +
                                "2,1,Hello\n" +
                                "2,2,Hello world\n" +
                                "3,3,Hello world, how are you?\n" +
@@ -204,6 +187,8 @@ public class FlatMapITCase extends MultipleProgramsTestBase 
{
                                "6,18,Comment#13\n" +
                                "6,19,Comment#14\n" +
                                "6,20,Comment#15\n";
+
+               compareResultAsTuples(result, expected);
        }
 
        public static class FlatMapper4 implements FlatMapFunction<CustomType, 
Tuple3<Integer, Long, String>> {
@@ -212,8 +197,7 @@ public class FlatMapITCase extends MultipleProgramsTestBase 
{
                                new Tuple3<Integer, Long, String>();
 
                @Override
-               public void flatMap(CustomType value, Collector<Tuple3<Integer, 
Long, String>> out)
-               throws Exception {
+               public void flatMap(CustomType value, Collector<Tuple3<Integer, 
Long, String>> out) throws Exception {
                        outTuple.setField(value.myInt, 0);
                        outTuple.setField(value.myLong, 1);
                        outTuple.setField(value.myString, 2);
@@ -233,10 +217,10 @@ public class FlatMapITCase extends 
MultipleProgramsTestBase {
                DataSet<String> typeConversionFlatMapDs = ds.
                                flatMap(new FlatMapper5());
 
-               typeConversionFlatMapDs.writeAsText(resultPath);
-               env.execute();
+               List<String> result = typeConversionFlatMapDs.collect();
 
-               expected =      "Hi\n" + "Hello\n" + "Hello world\n" +
+               String expected = "Hi\n" + "Hello\n" + "Hello world\n"
+                               +
                                "Hello world, how are you?\n" +
                                "I am fine.\n" + "Luke Skywalker\n" +
                                "Comment#1\n" + "Comment#2\n" +
@@ -247,21 +231,21 @@ public class FlatMapITCase extends 
MultipleProgramsTestBase {
                                "Comment#11\n" + "Comment#12\n" +
                                "Comment#13\n" + "Comment#14\n" +
                                "Comment#15\n";
+
+               compareResultAsText(result, expected);
        }
 
        public static class FlatMapper5 implements 
FlatMapFunction<Tuple3<Integer, Long, String>,String> {
                private static final long serialVersionUID = 1L;
 
                @Override
-               public void flatMap(Tuple3<Integer, Long, String> value,
-                               Collector<String> out) throws Exception {
+               public void flatMap(Tuple3<Integer, Long, String> value, 
Collector<String> out) throws Exception {
                        out.collect(value.f2);
                }
        }
 
        @Test
-       public void 
testFlatMapperIfUDFReturnsInputObjectMultipleTimesWhileChangingIt() throws
-                       Exception {
+       public void 
testFlatMapperIfUDFReturnsInputObjectMultipleTimesWhileChangingIt() throws 
Exception {
                /*
                 * Test flatmapper if UDF returns input object
                 * multiple times and changes it in between
@@ -273,10 +257,9 @@ public class FlatMapITCase extends 
MultipleProgramsTestBase {
                DataSet<Tuple3<Integer, Long, String>> inputObjFlatMapDs = ds.
                                flatMap(new FlatMapper6());
 
-               inputObjFlatMapDs.writeAsCsv(resultPath);
-               env.execute();
+               List<Tuple3<Integer, Long, String>> result = 
inputObjFlatMapDs.collect();
 
-               expected =      "0,1,Hi\n" +
+               String expected = "0,1,Hi\n" +
                                "0,2,Hello\n" + "1,2,Hello\n" +
                                "0,2,Hello world\n" + "1,2,Hello world\n" + 
"2,2,Hello world\n" +
                                "0,3,I am fine.\n" +
@@ -292,6 +275,8 @@ public class FlatMapITCase extends MultipleProgramsTestBase 
{
                                "0,6,Comment#12\n" + "1,6,Comment#12\n" +
                                "0,6,Comment#13\n" + "1,6,Comment#13\n" + 
"2,6,Comment#13\n" +
                                "0,6,Comment#15\n";
+
+               compareResultAsTuples(result, expected);
        }
 
        public static class FlatMapper6 implements 
FlatMapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
@@ -321,10 +306,9 @@ public class FlatMapITCase extends 
MultipleProgramsTestBase {
                DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
                DataSet<Tuple3<Integer, Long, String>> bcFlatMapDs = ds.
                                flatMap(new 
RichFlatMapper1()).withBroadcastSet(ints, "ints");
-               bcFlatMapDs.writeAsCsv(resultPath);
-               env.execute();
+               List<Tuple3<Integer, Long, String>> result = 
bcFlatMapDs.collect();
 
-               expected =      "55,1,Hi\n" +
+               String expected = "55,1,Hi\n" +
                                "55,2,Hello\n" +
                                "55,2,Hello world\n" +
                                "55,3,Hello world, how are you?\n" +
@@ -345,6 +329,8 @@ public class FlatMapITCase extends MultipleProgramsTestBase 
{
                                "55,6,Comment#13\n" +
                                "55,6,Comment#14\n" +
                                "55,6,Comment#15\n";
+
+               compareResultAsTuples(result, expected);
        }
 
        public static class RichFlatMapper1 extends 
RichFlatMapFunction<Tuple3<Integer,Long,String>, Tuple3<Integer,Long,String>> {
@@ -370,5 +356,5 @@ public class FlatMapITCase extends MultipleProgramsTestBase 
{
                        out.collect(outTuple);
                }
        }
-       
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java
index 3e9fde7..7e6de04 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java
@@ -1,20 +1,20 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package org.apache.flink.test.javaApiOperators;
 
@@ -32,38 +32,29 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
-import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import java.util.ArrayList;
 import java.util.Arrays;
-
+import java.util.List;
 
 @SuppressWarnings("serial")
 @RunWith(Parameterized.class)
 /**
-* The GroupCombine operator is not easy to test because it is essentially just 
a combiner. The result can be
-* the result of a normal groupReduce at any stage its execution. The basic 
idea is to preserve the grouping key
-* in the partial result, so that we can do a reduceGroup afterwards to 
finalize the results for verification.
-* In addition, we can use hashPartition to partition the data and check if no 
shuffling (just combining) has
-* been performed.
-*/
+ * The GroupCombine operator is not easy to test because it is essentially 
just a combiner. The result can be
+ * the result of a normal groupReduce at any stage its execution. The basic 
idea is to preserve the grouping key
+ * in the partial result, so that we can do a reduceGroup afterwards to 
finalize the results for verification.
+ * In addition, we can use hashPartition to partition the data and check if no 
shuffling (just combining) has
+ * been performed.
+ */
 public class GroupCombineITCase extends MultipleProgramsTestBase {
 
        public GroupCombineITCase(TestExecutionMode mode) {
                super(mode);
        }
 
-       private String resultPath;
-
-       private String expected;
-
        private static String identityResult = "1,1,Hi\n" +
                        "2,2,Hello\n" +
                        "3,2,Hello world\n" +
@@ -86,21 +77,6 @@ public class GroupCombineITCase extends 
MultipleProgramsTestBase {
                        "20,6,Comment#14\n" +
                        "21,6,Comment#15\n";
 
-       @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       @Before
-       public void before() throws Exception {
-               resultPath = tempFolder.newFile().toURI().toString();
-       }
-
-       @After
-       public void after() throws Exception {
-               if (expected != null) {
-                       compareResultsByLinesInMemory(expected, resultPath);
-               }
-       }
-
        @Test
        public void testAllGroupCombineIdentity() throws Exception {
 
@@ -108,19 +84,15 @@ public class GroupCombineITCase extends 
MultipleProgramsTestBase {
 
                DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
 
-
                DataSet<Tuple3<Integer, Long, String>> reduceDs = ds
                                // combine
                                .combineGroup(new IdentityFunction())
                                // fully reduce
                                .reduceGroup(new IdentityFunction());
 
+               List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
 
-               reduceDs.writeAsCsv(resultPath);
-
-               env.execute();
-
-               expected = identityResult;
+               compareResultAsTuples(result, identityResult);
        }
 
        @Test
@@ -136,11 +108,9 @@ public class GroupCombineITCase extends 
MultipleProgramsTestBase {
                                // fully reduce
                                .reduceGroup(new IdentityFunction());
 
-               reduceDs.writeAsCsv(resultPath);
-
-               env.execute();
+               List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
 
-               expected = identityResult;
+               compareResultAsTuples(result, identityResult);
        }
 
        @Test
@@ -157,12 +127,9 @@ public class GroupCombineITCase extends 
MultipleProgramsTestBase {
                                // fully reduce
                                .reduceGroup(new IdentityFunction());
 
+               List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
 
-               reduceDs.writeAsCsv(resultPath);
-
-               env.execute();
-
-               expected = identityResult;
+               compareResultAsTuples(result, identityResult);
        }
 
        @Test
@@ -182,11 +149,9 @@ public class GroupCombineITCase extends 
MultipleProgramsTestBase {
                                // fully reduce
                                .reduceGroup(new IdentityFunction());
 
-               reduceDs.writeAsCsv(resultPath);
+               List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
 
-               env.execute();
-
-               expected = identityResult;
+               compareResultAsTuples(result, identityResult);
        }
 
        @Test
@@ -201,7 +166,7 @@ public class GroupCombineITCase extends 
MultipleProgramsTestBase {
                                // wrap values as Kv pairs with the grouping 
key as key
                                .map(new Tuple3KvWrapper());
 
-               dsWrapped
+               List<Tuple3<Integer, Long, String>> result = dsWrapped
                                .groupBy(0)
                                // reduce partially
                                .combineGroup(new Tuple3toTuple3GroupReduce())
@@ -214,19 +179,16 @@ public class GroupCombineITCase extends 
MultipleProgramsTestBase {
                                        public Tuple3<Integer, Long, String> 
map(Tuple2<Long, Tuple3<Integer, Long, String>> value) throws Exception {
                                                return value.f1;
                                        }
-                               })
-                               .writeAsCsv(resultPath);
-
+                               }).collect();
 
-
-               env.execute();
-
-               expected = "1,1,combined\n" +
+               String expected = "1,1,combined\n" +
                                "5,4,combined\n" +
                                "15,9,combined\n" +
                                "34,16,combined\n" +
                                "65,25,combined\n" +
                                "111,36,combined\n";
+
+               compareResultAsTuples(result, expected);
        }
 
        @Test
@@ -241,33 +203,29 @@ public class GroupCombineITCase extends 
MultipleProgramsTestBase {
                                // wrap values as Kv pairs with the grouping 
key as key
                                .map(new Tuple3KvWrapper());
 
-               dsWrapped
+               List<Tuple2<Integer, Long>> result = dsWrapped
                                .groupBy(0)
-                                               // reduce partially
+                               // reduce partially
                                .combineGroup(new Tuple3toTuple2GroupReduce())
                                .groupBy(0)
-                                               // reduce fully to check result
+                               // reduce fully to check result
                                .reduceGroup(new Tuple2toTuple2GroupReduce())
-                                               //unwrap
+                               //unwrap
                                .map(new 
MapFunction<Tuple2<Long,Tuple2<Integer,Long>>, Tuple2<Integer,Long>>() {
                                        @Override
                                        public Tuple2<Integer, Long> 
map(Tuple2<Long, Tuple2<Integer, Long>> value) throws Exception {
                                                return value.f1;
                                        }
-                               })
-                               .writeAsCsv(resultPath);
+                               }).collect();
 
-
-
-               env.execute();
-
-               expected = "1,3\n" +
+               String expected = "1,3\n" +
                                "5,20\n" +
                                "15,58\n" +
                                "34,52\n" +
                                "65,70\n" +
                                "111,96\n";
 
+               compareResultAsTuples(result, expected);
        }
 
        @Test
@@ -284,7 +242,9 @@ public class GroupCombineITCase extends 
MultipleProgramsTestBase {
                // partition and group data
                UnsortedGrouping<Tuple3<Integer, Long, String>> partitionedDS = 
ds.partitionByHash(0).groupBy(1);
 
-               partitionedDS.combineGroup(new 
GroupCombineFunction<Tuple3<Integer, Long, String>, Tuple2<Long, Integer>>() {
+               List<Tuple2<Long, Integer>> result = partitionedDS
+                               .combineGroup(
+                                               new 
GroupCombineFunction<Tuple3<Integer, Long, String>, Tuple2<Long, Integer>>() {
                        @Override
                        public void combine(Iterable<Tuple3<Integer, Long, 
String>> values, Collector<Tuple2<Long, Integer>> out) throws Exception {
                                int count = 0;
@@ -295,29 +255,17 @@ public class GroupCombineITCase extends 
MultipleProgramsTestBase {
                                }
                                out.collect(new Tuple2(key, count));
                        }
-               }).writeAsCsv(resultPath);
-
-               env.execute();
-
-               String notExpected = "6,6\n" +
-                                                       "5,5\n" +
-                                                       "4,4\n" +
-                                                       "3,3\n" +
-                                                       "2,2\n" +
-                                                       "1,1\n";
+               }).collect();
 
-               // check
+               String[] localExpected = new String[] { "(6,6)", "(5,5)" + 
"(4,4)", "(3,3)", "(2,2)", "(1,1)" };
 
-               ArrayList<String> list = new ArrayList<String>();
-               readAllResultLines(list, resultPath);
-
-               String[] result = list.toArray(new String[list.size()]);
-               Arrays.sort(result);
-
-               String[] expected = notExpected.split("\n");
-               Arrays.sort(expected);
+               String[] resultAsStringArray = new String[result.size()];
+               for (int i = 0; i < resultAsStringArray.length; ++i) {
+                       resultAsStringArray[i] = result.get(i).toString();
+               }
+               Arrays.sort(resultAsStringArray);
 
-               Assert.assertEquals("The two arrays were identical.", false, 
Arrays.equals(expected, result));
+               Assert.assertEquals("The two arrays were identical.", false, 
Arrays.equals(localExpected, resultAsStringArray));
        }
 
        @Test
@@ -334,28 +282,29 @@ public class GroupCombineITCase extends 
MultipleProgramsTestBase {
                // partition and group data
                UnsortedGrouping<Tuple3<Integer, Long, String>> partitionedDS = 
ds.partitionByHash(0).groupBy(1);
 
-               partitionedDS.combineGroup(new 
GroupCombineFunction<Tuple3<Integer, Long, String>, Tuple2<Long, Integer>>() {
-                       @Override
-                       public void combine(Iterable<Tuple3<Integer, Long, 
String>> values, Collector<Tuple2<Long, Integer>> out) throws Exception {
-                               int count = 0;
-                               long key = 0;
-                               for (Tuple3<Integer, Long, String> value : 
values) {
-                                       key = value.f1;
-                                       count++;
-                               }
-                               out.collect(new Tuple2(key, count));
-                       }
-               }).writeAsCsv(resultPath);
-
-               env.execute();
+               List<Tuple2<Long, Integer>> result = partitionedDS
+                               .combineGroup(
+                               new GroupCombineFunction<Tuple3<Integer, Long, 
String>, Tuple2<Long, Integer>>() {
+                                       @Override
+                                       public void 
combine(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple2<Long, 
Integer>> out) throws Exception {
+                                               int count = 0;
+                                               long key = 0;
+                                               for (Tuple3<Integer, Long, 
String> value : values) {
+                                                       key = value.f1;
+                                                       count++;
+                                               }
+                                               out.collect(new Tuple2(key, 
count));
+                                       }
+                               }).collect();
 
-               expected = "6,6\n" +
+               String expected = "6,6\n" +
                                "5,5\n" +
                                "4,4\n" +
                                "3,3\n" +
                                "2,2\n" +
                                "1,1\n";
 
+               compareResultAsTuples(result, expected);
        }
 
        @Test
@@ -373,15 +322,15 @@ public class GroupCombineITCase extends 
MultipleProgramsTestBase {
 
                // all methods on DataSet
                ds.combineGroup(new GroupCombineFunctionExample())
-                               .output(new 
DiscardingOutputFormat<Tuple1<String>>());
+               .output(new DiscardingOutputFormat<Tuple1<String>>());
 
                // all methods on UnsortedGrouping
                ds.groupBy(0).combineGroup(new GroupCombineFunctionExample())
-                               .output(new 
DiscardingOutputFormat<Tuple1<String>>());
+               .output(new DiscardingOutputFormat<Tuple1<String>>());
 
                // all methods on SortedGrouping
                ds.groupBy(0).sortGroup(0, Order.ASCENDING).combineGroup(new 
GroupCombineFunctionExample())
-                               .output(new 
DiscardingOutputFormat<Tuple1<String>>());
+               .output(new DiscardingOutputFormat<Tuple1<String>>());
 
                env.execute();
        }
@@ -407,7 +356,7 @@ public class GroupCombineITCase extends 
MultipleProgramsTestBase {
        }
 
        public static class IdentityFunction implements 
GroupCombineFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, 
String>>,
-                                                                               
                        GroupReduceFunction<Tuple3<Integer, Long, String>, 
Tuple3<Integer, Long, String>> {
+       GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, 
Long, String>> {
 
                @Override
                public void combine(Iterable<Tuple3<Integer, Long, String>> 
values, Collector<Tuple3<Integer, Long, String>> out) throws Exception {
@@ -427,6 +376,7 @@ public class GroupCombineITCase extends 
MultipleProgramsTestBase {
 
        public static class Tuple3toTuple3GroupReduce implements 
KvGroupReduce<Long, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, 
String>, Tuple3<Integer, Long, String>> {
 
+               @Override
                public void combine(Iterable<Tuple2<Long, Tuple3<Integer, Long, 
String>>> values, Collector<Tuple2<Long, Tuple3<Integer, Long, String>>> out) 
throws Exception {
                        int i = 0;
                        long l = 0;
@@ -478,6 +428,7 @@ public class GroupCombineITCase extends 
MultipleProgramsTestBase {
 
        public static class Tuple2toTuple2GroupReduce implements 
KvGroupReduce<Long, Tuple2<Integer, Long>, Tuple2<Integer, Long>, 
Tuple2<Integer, Long>> {
 
+               @Override
                public void combine(Iterable<Tuple2<Long, Tuple2<Integer, 
Long>>> values, Collector<Tuple2<Long, Tuple2<Integer, Long>>> out) throws 
Exception {
                        int i = 0;
                        long l = 0;
@@ -516,5 +467,4 @@ public class GroupCombineITCase extends 
MultipleProgramsTestBase {
        public interface KvGroupReduce<K, V, INT, OUT> extends 
CombineAndReduceGroup<Tuple2<K, V>, Tuple2<K, INT>, Tuple2<K, OUT>> {
        }
 
-
-}
\ No newline at end of file
+}

Reply via email to