http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
index 0232464..4259b63 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.test.javaApiOperators;
 
 import java.util.Collection;
 import java.util.Date;
+import java.util.List;
 
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.MapFunction;
@@ -37,12 +38,8 @@ import 
org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType
 import 
org.apache.flink.test.javaApiOperators.util.CollectionDataSets.PojoWithDateAndEnum;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
-import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -54,22 +51,6 @@ public class ReduceITCase extends MultipleProgramsTestBase {
                super(mode);
        }
 
-       private String resultPath;
-       private String expected;
-
-       @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       @Before
-       public void before() throws Exception{
-               resultPath = tempFolder.newFile().toURI().toString();
-       }
-
-       @After
-       public void after() throws Exception{
-               compareResultsByLinesInMemory(expected, resultPath);
-       }
-
        @Test
        public void testReduceOnTuplesWithKeyFieldSelector() throws Exception {
                /*
@@ -82,15 +63,16 @@ public class ReduceITCase extends MultipleProgramsTestBase {
                DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
                                groupBy(1).reduce(new Tuple3Reduce("B-)"));
 
-               reduceDs.writeAsCsv(resultPath);
-               env.execute();
+               List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
 
-               expected = "1,1,Hi\n" +
+               String expected = "1,1,Hi\n" +
                                "5,2,B-)\n" +
                                "15,3,B-)\n" +
                                "34,4,B-)\n" +
                                "65,5,B-)\n" +
                                "111,6,B-)\n";
+
+               compareResultAsTuples(result, expected);
        }
 
        @Test
@@ -105,10 +87,10 @@ public class ReduceITCase extends MultipleProgramsTestBase 
{
                DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs 
= ds.
                                groupBy(4,0).reduce(new Tuple5Reduce());
 
-               reduceDs.writeAsCsv(resultPath);
-               env.execute();
+               List<Tuple5<Integer, Long, Integer, String, Long>> result = 
reduceDs
+                               .collect();
 
-               expected = "1,1,0,Hallo,1\n" +
+               String expected = "1,1,0,Hallo,1\n" +
                                "2,3,2,Hallo Welt wie,1\n" +
                                "2,2,1,Hallo Welt,2\n" +
                                "3,9,0,P-),2\n" +
@@ -118,6 +100,8 @@ public class ReduceITCase extends MultipleProgramsTestBase {
                                "5,11,10,GHI,1\n" +
                                "5,29,0,P-),2\n" +
                                "5,25,0,P-),3\n";
+
+               compareResultAsTuples(result, expected);
        }
 
        @Test
@@ -132,15 +116,16 @@ public class ReduceITCase extends 
MultipleProgramsTestBase {
                DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
                                groupBy(new KeySelector1()).reduce(new 
Tuple3Reduce("B-)"));
 
-               reduceDs.writeAsCsv(resultPath);
-               env.execute();
+               List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
 
-               expected = "1,1,Hi\n" +
+               String expected = "1,1,Hi\n" +
                                "5,2,B-)\n" +
                                "15,3,B-)\n" +
                                "34,4,B-)\n" +
                                "65,5,B-)\n" +
                                "111,6,B-)\n";
+
+               compareResultAsTuples(result, expected);
        }
 
        public static class KeySelector1 implements 
KeySelector<Tuple3<Integer,Long,String>, Long> {
@@ -163,15 +148,16 @@ public class ReduceITCase extends 
MultipleProgramsTestBase {
                DataSet<CustomType> reduceDs = ds.
                                groupBy(new KeySelector2()).reduce(new 
CustomTypeReduce());
 
-               reduceDs.writeAsText(resultPath);
-               env.execute();
+               List<CustomType> result = reduceDs.collect();
 
-               expected = "1,0,Hi\n" +
+               String expected = "1,0,Hi\n" +
                                "2,3,Hello!\n" +
                                "3,12,Hello!\n" +
                                "4,30,Hello!\n" +
                                "5,60,Hello!\n" +
                                "6,105,Hello!\n";
+
+               compareResultAsText(result, expected);
        }
 
        public static class KeySelector2 implements KeySelector<CustomType, 
Integer> {
@@ -194,10 +180,11 @@ public class ReduceITCase extends 
MultipleProgramsTestBase {
                DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
                                reduce(new AllAddingTuple3Reduce());
 
-               reduceDs.writeAsCsv(resultPath);
-               env.execute();
+               List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
 
-               expected = "231,91,Hello World\n";
+               String expected = "231,91,Hello World\n";
+
+               compareResultAsTuples(result, expected);
        }
 
        @Test
@@ -212,10 +199,11 @@ public class ReduceITCase extends 
MultipleProgramsTestBase {
                DataSet<CustomType> reduceDs = ds.
                                reduce(new AllAddingCustomTypeReduce());
 
-               reduceDs.writeAsText(resultPath);
-               env.execute();
+               List<CustomType> result = reduceDs.collect();
+
+               String expected = "91,210,Hello!";
 
-               expected = "91,210,Hello!";
+               compareResultAsText(result, expected);
        }
 
        @Test
@@ -232,15 +220,16 @@ public class ReduceITCase extends 
MultipleProgramsTestBase {
                DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
                                groupBy(1).reduce(new 
BCTuple3Reduce()).withBroadcastSet(intDs, "ints");
 
-               reduceDs.writeAsCsv(resultPath);
-               env.execute();
+               List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
 
-               expected = "1,1,Hi\n" +
+               String expected = "1,1,Hi\n" +
                                "5,2,55\n" +
                                "15,3,55\n" +
                                "34,4,55\n" +
                                "65,5,55\n" +
                                "111,6,55\n";
+
+               compareResultAsTuples(result, expected);
        }
 
        @Test
@@ -255,10 +244,10 @@ public class ReduceITCase extends 
MultipleProgramsTestBase {
                DataSet<Tuple5<Integer, Long,  Integer, String, Long>> reduceDs 
= ds .
                                groupBy(new KeySelector3()).reduce(new 
Tuple5Reduce());
 
-               reduceDs.writeAsCsv(resultPath);
-               env.execute();
+               List<Tuple5<Integer, Long, Integer, String, Long>> result = 
reduceDs
+                               .collect();
 
-               expected = "1,1,0,Hallo,1\n" +
+               String expected = "1,1,0,Hallo,1\n" +
                                "2,3,2,Hallo Welt wie,1\n" +
                                "2,2,1,Hallo Welt,2\n" +
                                "3,9,0,P-),2\n" +
@@ -268,6 +257,8 @@ public class ReduceITCase extends MultipleProgramsTestBase {
                                "5,11,10,GHI,1\n" +
                                "5,29,0,P-),2\n" +
                                "5,25,0,P-),3\n";
+
+               compareResultAsTuples(result, expected);
        }
 
        public static class KeySelector3 implements 
KeySelector<Tuple5<Integer,Long,Integer,String,Long>, Tuple2<Integer, Long>> {
@@ -291,10 +282,10 @@ public class ReduceITCase extends 
MultipleProgramsTestBase {
                DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs 
= ds.
                                groupBy("f4","f0").reduce(new Tuple5Reduce());
 
-               reduceDs.writeAsCsv(resultPath);
-               env.execute();
+               List<Tuple5<Integer, Long, Integer, String, Long>> result = 
reduceDs
+                               .collect();
 
-               expected = "1,1,0,Hallo,1\n" +
+               String expected = "1,1,0,Hallo,1\n" +
                                "2,3,2,Hallo Welt wie,1\n" +
                                "2,2,1,Hallo Welt,2\n" +
                                "3,9,0,P-),2\n" +
@@ -304,6 +295,8 @@ public class ReduceITCase extends MultipleProgramsTestBase {
                                "5,11,10,GHI,1\n" +
                                "5,29,0,P-),2\n" +
                                "5,25,0,P-),3\n";
+
+               compareResultAsTuples(result, expected);
        }
 
        @Test
@@ -317,9 +310,11 @@ public class ReduceITCase extends MultipleProgramsTestBase 
{
 
                DataSet<String> res = ds.groupBy("group").reduceGroup(new 
GroupReducer1());
 
-               res.writeAsText(resultPath);
-               env.execute();
-               expected = "ok\nok";
+               List<String> result = res.collect();
+
+               String expected = "ok\nok";
+
+               compareResultAsText(result, expected);
        }
 
        public static class Mapper1 implements MapFunction<Long, 
PojoWithDateAndEnum> {
@@ -369,20 +364,20 @@ public class ReduceITCase extends 
MultipleProgramsTestBase {
                        out.collect("ok");
                }
        }
-       
+
        public static class Tuple3Reduce implements 
ReduceFunction<Tuple3<Integer, Long, String>> {
                private static final long serialVersionUID = 1L;
                private final Tuple3<Integer, Long, String> out = new 
Tuple3<Integer, Long, String>();
                private final String f2Replace;
-               
-               public Tuple3Reduce() { 
+
+               public Tuple3Reduce() {
                        this.f2Replace = null;
                }
-               
-               public Tuple3Reduce(String f2Replace) { 
+
+               public Tuple3Reduce(String f2Replace) {
                        this.f2Replace = f2Replace;
                }
-               
+
 
                @Override
                public Tuple3<Integer, Long, String> reduce(
@@ -397,41 +392,41 @@ public class ReduceITCase extends 
MultipleProgramsTestBase {
                        return out;
                }
        }
-       
+
        public static class Tuple5Reduce implements 
ReduceFunction<Tuple5<Integer, Long, Integer, String, Long>> {
                private static final long serialVersionUID = 1L;
                private final Tuple5<Integer, Long, Integer, String, Long> out 
= new Tuple5<Integer, Long, Integer, String, Long>();
-               
+
                @Override
                public Tuple5<Integer, Long, Integer, String, Long> reduce(
                                Tuple5<Integer, Long, Integer, String, Long> 
in1,
                                Tuple5<Integer, Long, Integer, String, Long> 
in2)
-                               throws Exception {
-                       
+                                               throws Exception {
+
                        out.setFields(in1.f0, in1.f1+in2.f1, 0, "P-)", in1.f4);
                        return out;
                }
        }
-       
+
        public static class CustomTypeReduce implements 
ReduceFunction<CustomType> {
                private static final long serialVersionUID = 1L;
                private final CustomType out = new CustomType();
-               
+
                @Override
                public CustomType reduce(CustomType in1, CustomType in2)
                                throws Exception {
-                       
+
                        out.myInt = in1.myInt;
                        out.myLong = in1.myLong + in2.myLong;
                        out.myString = "Hello!";
                        return out;
                }
        }
-       
+
        public static class AllAddingTuple3Reduce implements 
ReduceFunction<Tuple3<Integer, Long, String>> {
                private static final long serialVersionUID = 1L;
                private final Tuple3<Integer, Long, String> out = new 
Tuple3<Integer, Long, String>();
-               
+
                @Override
                public Tuple3<Integer, Long, String> reduce(
                                Tuple3<Integer, Long, String> in1,
@@ -441,37 +436,37 @@ public class ReduceITCase extends 
MultipleProgramsTestBase {
                        return out;
                }
        }
-       
+
        public static class AllAddingCustomTypeReduce implements 
ReduceFunction<CustomType> {
                private static final long serialVersionUID = 1L;
                private final CustomType out = new CustomType();
-               
+
                @Override
                public CustomType reduce(CustomType in1, CustomType in2)
                                throws Exception {
-                       
+
                        out.myInt = in1.myInt + in2.myInt;
                        out.myLong = in1.myLong + in2.myLong;
                        out.myString = "Hello!";
                        return out;
                }
        }
-       
+
        public static class BCTuple3Reduce extends 
RichReduceFunction<Tuple3<Integer, Long, String>> {
                private static final long serialVersionUID = 1L;
                private final Tuple3<Integer, Long, String> out = new 
Tuple3<Integer, Long, String>();
                private String f2Replace = "";
-               
+
                @Override
                public void open(Configuration config) {
-                       
+
                        Collection<Integer> ints = 
this.getRuntimeContext().getBroadcastVariable("ints");
                        int sum = 0;
                        for(Integer i : ints) {
                                sum += i;
                        }
                        f2Replace = sum+"";
-                       
+
                }
 
                @Override
@@ -483,5 +478,5 @@ public class ReduceITCase extends MultipleProgramsTestBase {
                        return out;
                }
        }
-       
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReplicatingDataSourceITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReplicatingDataSourceITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReplicatingDataSourceITCase.java
index c7ca37d..8cc54aa 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReplicatingDataSourceITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReplicatingDataSourceITCase.java
@@ -19,6 +19,8 @@
 package org.apache.flink.test.javaApiOperators;
 
 
+import java.util.List;
+
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.io.ReplicatingInputFormat;
@@ -32,11 +34,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.util.NumberSequenceIterator;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -51,23 +49,6 @@ public class ReplicatingDataSourceITCase extends 
MultipleProgramsTestBase {
                super(mode);
        }
 
-       private String resultPath;
-
-       private String expectedResult;
-
-       @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       @Before
-       public void before() throws Exception{
-               resultPath = tempFolder.newFile().toURI().toString();
-       }
-
-       @After
-       public void after() throws Exception{
-               compareResultsByLinesInMemory(expectedResult, resultPath); // 
500500 = 0+1+2+3+...+999+1000
-       }
-
        @Test
        public void testReplicatedSourceToJoin() throws Exception {
                /*
@@ -85,11 +66,11 @@ public class ReplicatingDataSourceITCase extends 
MultipleProgramsTestBase {
                                .projectFirst(0)
                                .sum(0);
 
-               pairs.writeAsText(resultPath);
-               env.execute();
+               List<Tuple> result = pairs.collect();
 
-               expectedResult = "(500500)";
+               String expectedResult = "(500500)";
 
+               compareResultAsText(result, expectedResult);
        }
 
        @Test
@@ -120,11 +101,11 @@ public class ReplicatingDataSourceITCase extends 
MultipleProgramsTestBase {
                                })
                                .sum(0);
 
-               pairs.writeAsText(resultPath);
-               env.execute();
+               List<Tuple1<Long>> result = pairs.collect();
 
-               expectedResult = "(500500)";
+               String expectedResult = "(500500)";
 
+               compareResultAsText(result, expectedResult);
        }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java
index d961f3a..2b7226b 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java
@@ -31,16 +31,13 @@ import 
org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.io.Serializable;
 import java.util.Iterator;
+import java.util.List;
 
 @RunWith(Parameterized.class)
 public class SortPartitionITCase extends MultipleProgramsTestBase {
@@ -49,22 +46,6 @@ public class SortPartitionITCase extends 
MultipleProgramsTestBase {
                super(mode);
        }
 
-       private String resultPath;
-       private String expected;
-
-       @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       @Before
-       public void before() throws Exception{
-               resultPath = tempFolder.newFile().toURI().toString();
-       }
-
-       @After
-       public void after() throws Exception{
-               compareResultsByLinesInMemory(expected, resultPath);
-       }
-
        @Test
        public void testSortPartitionByKeyField() throws Exception {
                /*
@@ -75,16 +56,15 @@ public class SortPartitionITCase extends 
MultipleProgramsTestBase {
                env.setParallelism(4);
 
                DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
-               ds
+               List result = ds
                                .map(new IdMapper()).setParallelism(4) // 
parallelize input
                                .sortPartition(1, Order.DESCENDING)
                                .mapPartition(new 
OrderCheckMapper<Tuple3<Integer, Long, String>>(new Tuple3Checker()))
-                               .distinct()
-                               .writeAsText(resultPath);
+                               .distinct().collect();
 
-               env.execute();
+               String expected = "(true)\n";
 
-               expected = "(true)\n";
+               compareResultAsText(result, expected);
        }
 
        @Test
@@ -97,19 +77,19 @@ public class SortPartitionITCase extends 
MultipleProgramsTestBase {
                env.setParallelism(2);
 
                DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = 
CollectionDataSets.get5TupleDataSet(env);
-               ds
+               List result = ds
                                .map(new IdMapper()).setParallelism(2) // 
parallelize input
                                .sortPartition(4, Order.ASCENDING)
                                .sortPartition(2, Order.DESCENDING)
                                .mapPartition(new 
OrderCheckMapper<Tuple5<Integer, Long, Integer, String, Long>>(new 
Tuple5Checker()))
-                               .distinct()
-                               .writeAsText(resultPath);
+                               .distinct().collect();
 
-               env.execute();
+               String expected = "(true)\n";
 
-               expected = "(true)\n";
+               compareResultAsText(result, expected);
        }
 
+       @SuppressWarnings({ "rawtypes", "unchecked" })
        @Test
        public void testSortPartitionByFieldExpression() throws Exception {
                /*
@@ -120,16 +100,15 @@ public class SortPartitionITCase extends 
MultipleProgramsTestBase {
                env.setParallelism(4);
 
                DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
-               ds
+               List result = ds
                                .map(new IdMapper()).setParallelism(4) // 
parallelize input
                                .sortPartition("f1", Order.DESCENDING)
                                .mapPartition(new 
OrderCheckMapper<Tuple3<Integer, Long, String>>(new Tuple3Checker()))
-                               .distinct()
-                               .writeAsText(resultPath);
+                               .distinct().collect();
 
-               env.execute();
+               String expected = "(true)\n";
 
-               expected = "(true)\n";
+               compareResultAsText(result, expected);
        }
 
        @Test
@@ -142,17 +121,16 @@ public class SortPartitionITCase extends 
MultipleProgramsTestBase {
                env.setParallelism(2);
 
                DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = 
CollectionDataSets.get5TupleDataSet(env);
-               ds
+               List result = ds
                                .map(new IdMapper()).setParallelism(2) // 
parallelize input
                                .sortPartition("f4", Order.ASCENDING)
                                .sortPartition("f2", Order.DESCENDING)
                                .mapPartition(new 
OrderCheckMapper<Tuple5<Integer, Long, Integer, String, Long>>(new 
Tuple5Checker()))
-                               .distinct()
-                               .writeAsText(resultPath);
+                               .distinct().collect();
 
-               env.execute();
+               String expected = "(true)\n";
 
-               expected = "(true)\n";
+               compareResultAsText(result, expected);
        }
 
        @Test
@@ -165,17 +143,16 @@ public class SortPartitionITCase extends 
MultipleProgramsTestBase {
                env.setParallelism(3);
 
                DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = 
CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
-               ds
+               List result = ds
                                .map(new IdMapper()).setParallelism(3) // 
parallelize input
                                .sortPartition("f0.f1", Order.ASCENDING)
                                .sortPartition("f1", Order.DESCENDING)
                                .mapPartition(new 
OrderCheckMapper<Tuple2<Tuple2<Integer, Integer>, String>>(new 
NestedTupleChecker()))
-                               .distinct()
-                               .writeAsText(resultPath);
+                               .distinct().collect();
 
-               env.execute();
+               String expected = "(true)\n";
 
-               expected = "(true)\n";
+               compareResultAsText(result, expected);
        }
 
        @Test
@@ -188,17 +165,16 @@ public class SortPartitionITCase extends 
MultipleProgramsTestBase {
                env.setParallelism(3);
 
                DataSet<POJO> ds = CollectionDataSets.getMixedPojoDataSet(env);
-               ds
+               List result = ds
                                .map(new IdMapper()).setParallelism(1) // 
parallelize input
                                
.sortPartition("nestedTupleWithCustom.f1.myString", Order.ASCENDING)
                                .sortPartition("number", Order.DESCENDING)
                                .mapPartition(new OrderCheckMapper<POJO>(new 
PojoChecker()))
-                               .distinct()
-                               .writeAsText(resultPath);
+                               .distinct().collect();
 
-               env.execute();
+               String expected = "(true)\n";
 
-               expected = "(true)\n";
+               compareResultAsText(result, expected);
        }
 
        @Test
@@ -211,15 +187,14 @@ public class SortPartitionITCase extends 
MultipleProgramsTestBase {
                env.setParallelism(3);
 
                DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
-               ds
+               List result = ds
                                .sortPartition(1, 
Order.DESCENDING).setParallelism(3) // change parallelism
                                .mapPartition(new 
OrderCheckMapper<Tuple3<Integer, Long, String>>(new Tuple3Checker()))
-                               .distinct()
-                               .writeAsText(resultPath);
+                               .distinct().collect();
 
-               env.execute();
+               String expected = "(true)\n";
 
-               expected = "(true)\n";
+               compareResultAsText(result, expected);
        }
 
        public static interface OrderChecker<T> extends Serializable {
@@ -237,7 +212,7 @@ public class SortPartitionITCase extends 
MultipleProgramsTestBase {
        public static class Tuple5Checker implements 
OrderChecker<Tuple5<Integer, Long, Integer, String, Long>> {
                @Override
                public boolean inOrder(Tuple5<Integer, Long, Integer, String, 
Long> t1,
-                                                               Tuple5<Integer, 
Long, Integer, String, Long> t2) {
+                               Tuple5<Integer, Long, Integer, String, Long> 
t2) {
                        return t1.f4 < t2.f4 || t1.f4 == t2.f4 && t1.f2 >= 
t2.f2;
                }
        }
@@ -245,19 +220,18 @@ public class SortPartitionITCase extends 
MultipleProgramsTestBase {
        public static class NestedTupleChecker implements 
OrderChecker<Tuple2<Tuple2<Integer, Integer>, String>> {
                @Override
                public boolean inOrder(Tuple2<Tuple2<Integer, Integer>, String> 
t1,
-                                                               
Tuple2<Tuple2<Integer, Integer>, String> t2) {
+                               Tuple2<Tuple2<Integer, Integer>, String> t2) {
                        return t1.f0.f1 < t2.f0.f1 ||
                                        t1.f0.f1 == t2.f0.f1 && 
t1.f1.compareTo(t2.f1) >= 0;
-               }
+               }
        }
 
        public static class PojoChecker implements OrderChecker<POJO> {
                @Override
-               public boolean inOrder(POJO t1,
-                                                          POJO t2) {
+               public boolean inOrder(POJO t1, POJO t2) {
                        return 
t1.nestedTupleWithCustom.f1.myString.compareTo(t2.nestedTupleWithCustom.f1.myString)
 < 0 ||
                                        
t1.nestedTupleWithCustom.f1.myString.compareTo(t2.nestedTupleWithCustom.f1.myString)
 == 0 &&
-                                                       t1.number >= t2.number;
+                                       t1.number >= t2.number;
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SumMinMaxITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SumMinMaxITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SumMinMaxITCase.java
index e6367c3..e5bdc19 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SumMinMaxITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SumMinMaxITCase.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.test.javaApiOperators;
 
+import java.util.List;
+
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple1;
@@ -25,11 +27,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -40,22 +38,6 @@ public class SumMinMaxITCase extends 
MultipleProgramsTestBase {
                super(mode);
        }
 
-       private String resultPath;
-       private String expected;
-
-       @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       @Before
-       public void before() throws Exception{
-               resultPath = tempFolder.newFile().toURI().toString();
-       }
-
-       @After
-       public void after() throws Exception{
-               compareResultsByLinesInMemory(expected, resultPath);
-       }
-
        @Test
        public void testSumMaxAndProject() throws Exception {
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
@@ -66,10 +48,11 @@ public class SumMinMaxITCase extends 
MultipleProgramsTestBase {
                                .andMax(1)
                                .project(0, 1);
 
-               sumDs.writeAsCsv(resultPath);
-               env.execute();
+               List<Tuple2<Integer, Long>> result = sumDs.collect();
+
+               String expected = "231,6\n";
 
-               expected = "231,6\n";
+               compareResultAsTuples(result, expected);
        }
 
        @Test
@@ -85,15 +68,16 @@ public class SumMinMaxITCase extends 
MultipleProgramsTestBase {
                                .sum(0)
                                .project(1, 0);
 
-               aggregateDs.writeAsCsv(resultPath);
-               env.execute();
+               List<Tuple2<Long, Integer>> result = aggregateDs.collect();
 
-               expected = "1,1\n" +
+               String expected = "1,1\n" +
                                "2,5\n" +
                                "3,15\n" +
                                "4,34\n" +
                                "5,65\n" +
                                "6,111\n";
+
+               compareResultAsTuples(result, expected);
        }
 
        @Test
@@ -110,9 +94,10 @@ public class SumMinMaxITCase extends 
MultipleProgramsTestBase {
                                .min(0)
                                .project(0);
 
-               aggregateDs.writeAsCsv(resultPath);
-               env.execute();
+               List<Tuple1<Integer>> result = aggregateDs.collect();
+
+               String expected = "1\n";
 
-               expected = "1\n";
+               compareResultAsTuples(result, expected);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/TypeHintITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/TypeHintITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/TypeHintITCase.java
index 350227a..a2c10bc 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/TypeHintITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/TypeHintITCase.java
@@ -22,6 +22,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.LinkedList;
+import java.util.List;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
@@ -44,25 +45,14 @@ public class TypeHintITCase extends JavaProgramTestBase {
        private static int NUM_PROGRAMS = 3;
 
        private int curProgId = config.getInteger("ProgramId", -1);
-       private String resultPath;
-       private String expectedResult;
 
        public TypeHintITCase(Configuration config) {
                super(config);
        }
 
        @Override
-       protected void preSubmit() throws Exception {
-               resultPath = getTempDirPath("result");
-       }
-       @Override
        protected void testProgram() throws Exception {
-               expectedResult = TypeHintProgs.runProgram(curProgId, 
resultPath);
-       }
-
-       @Override
-       protected void postSubmit() throws Exception {
-               compareResultsByLinesInMemory(expectedResult, resultPath);
+               TypeHintProgs.runProgram(curProgId);
        }
 
        @Parameters
@@ -81,7 +71,7 @@ public class TypeHintITCase extends JavaProgramTestBase {
 
        private static class TypeHintProgs {
 
-               public static String runProgram(int progId, String resultPath) 
throws Exception {
+               public static void runProgram(int progId) throws Exception {
                        switch(progId) {
                        // Test identity map with missing types and string type 
hint
                        case 1: {
@@ -91,13 +81,14 @@ public class TypeHintITCase extends JavaProgramTestBase {
                                DataSet<Tuple3<Integer, Long, String>> 
identityMapDs = ds
                                                .map(new Mapper<Tuple3<Integer, 
Long, String>, Tuple3<Integer, Long, String>>())
                                                .returns("Tuple3<Integer, Long, 
String>");
-                               identityMapDs.writeAsText(resultPath);
-                               env.execute();
+                               List<Tuple3<Integer, Long, String>> result = 
identityMapDs.collect();
 
-                               // return expected result
-                               return "(2,2,Hello)\n" +
-                               "(3,2,Hello world)\n" +
-                               "(1,1,Hi)\n";
+                               String expectedResult = "(2,2,Hello)\n" +
+                                               "(3,2,Hello world)\n" +
+                                               "(1,1,Hi)\n";
+
+                               compareResultAsText(result, expectedResult);
+                               break;
                        }
                        // Test identity map with missing types and type 
information type hint
                        case 2: {
@@ -108,32 +99,34 @@ public class TypeHintITCase extends JavaProgramTestBase {
                                                // all following generics get 
erased during compilation
                                                .map(new Mapper<Tuple3<Integer, 
Long, String>, Tuple3<Integer, Long, String>>())
                                                .returns(new 
TupleTypeInfo<Tuple3<Integer, Long, String>>(BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO));
-                               identityMapDs.writeAsText(resultPath);
-                               env.execute();
+                               List<Tuple3<Integer, Long, String>> result = 
identityMapDs
+                                               .collect();
+
+                               String expectedResult = "(2,2,Hello)\n" +
+                                               "(3,2,Hello world)\n" +
+                                               "(1,1,Hi)\n";
 
-                               // return expected result
-                               return "(2,2,Hello)\n" +
-                               "(3,2,Hello world)\n" +
-                               "(1,1,Hi)\n";
+                               compareResultAsText(result, expectedResult);
+                               break;
                        }
                        // Test flat map with class type hint
                        case 3: {
                                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
                                DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.getSmall3TupleDataSet(env);
-                               @SuppressWarnings({ "rawtypes", "unchecked" })
                                DataSet<Integer> identityMapDs = ds.
                                flatMap(new FlatMapper<Tuple3<Integer, Long, 
String>, Integer>())
-                               .returns((Class) Integer.class);
-                               identityMapDs.writeAsText(resultPath);
-                               env.execute();
-
-                               // return expected result
-                               return "2\n" +
-                               "3\n" +
-                               "1\n";
+                               .returns(Integer.class);
+                               List<Integer> result = identityMapDs.collect();
+
+                               String expectedResult = "2\n" +
+                                               "3\n" +
+                                               "1\n";
+
+                               compareResultAsText(result, expectedResult);
+                               break;
                        }
-                       default: 
+                       default:
                                throw new IllegalArgumentException("Invalid 
program id");
                        }
                }
@@ -150,7 +143,7 @@ public class TypeHintITCase extends JavaProgramTestBase {
                        return (V) value;
                }
        }
-       
+
        public static class FlatMapper<T, V> implements FlatMapFunction<T, V> {
                private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java
index 2e7ae9c..7ab2764 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java
@@ -18,15 +18,13 @@
 
 package org.apache.flink.test.javaApiOperators;
 
+import java.util.List;
+
 import org.apache.flink.api.common.functions.RichFilterFunction;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.apache.flink.api.java.DataSet;
@@ -61,22 +59,6 @@ public class UnionITCase extends MultipleProgramsTestBase {
                super(mode);
        }
 
-       private String resultPath;
-       private String expected;
-
-       @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       @Before
-       public void before() throws Exception{
-               resultPath = tempFolder.newFile().toURI().toString();
-       }
-
-       @After
-       public void after() throws Exception{
-               compareResultsByLinesInMemory(expected, resultPath);
-       }
-
        @Test
        public void testUnion2IdenticalDataSets() throws Exception {
                /*
@@ -87,10 +69,11 @@ public class UnionITCase extends MultipleProgramsTestBase {
                DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
                DataSet<Tuple3<Integer, Long, String>> unionDs = 
ds.union(CollectionDataSets.get3TupleDataSet(env));
 
-               unionDs.writeAsCsv(resultPath);
-               env.execute();
+               List<Tuple3<Integer, Long, String>> result = unionDs.collect();
+
+               String expected = FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING;
 
-               expected = FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING;
+               compareResultAsTuples(result, expected);
        }
 
        @Test
@@ -107,11 +90,13 @@ public class UnionITCase extends MultipleProgramsTestBase {
                                .union(CollectionDataSets.get3TupleDataSet(env))
                                
.union(CollectionDataSets.get3TupleDataSet(env));
 
-               unionDs.writeAsCsv(resultPath);
-               env.execute();
+               List<Tuple3<Integer, Long, String>> result = unionDs.collect();
 
-               expected = FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING + 
FULL_TUPLE_3_STRING +
+               String expected = FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING
+                               + FULL_TUPLE_3_STRING +
                                FULL_TUPLE_3_STRING +   FULL_TUPLE_3_STRING;
+
+               compareResultAsTuples(result, expected);
        }
 
        @Test
@@ -128,10 +113,11 @@ public class UnionITCase extends MultipleProgramsTestBase 
{
                DataSet<Tuple3<Integer, Long, String>> unionDs = 
CollectionDataSets.get3TupleDataSet(env)
                                .union(empty);
 
-               unionDs.writeAsCsv(resultPath);
-               env.execute();
+               List<Tuple3<Integer, Long, String>> result = unionDs.collect();
 
-               expected = FULL_TUPLE_3_STRING;
+               String expected = FULL_TUPLE_3_STRING;
+
+               compareResultAsTuples(result, expected);
        }
 
        public static class RichFilter1 extends 
RichFilterFunction<Tuple3<Integer,Long,String>> {
@@ -142,5 +128,5 @@ public class UnionITCase extends MultipleProgramsTestBase {
                        return false;
                }
        }
-       
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
index a68fd82..1faf4c1 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
@@ -24,6 +24,7 @@ import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.Date;
 import java.util.GregorianCalendar;
 import java.util.HashMap;
@@ -313,6 +314,19 @@ public class CollectionDataSets {
                }
        }
 
+       public static class CustomTypeComparator implements 
Comparator<CustomType> {
+               @Override
+               public int compare(CustomType o1, CustomType o2) {
+                       int diff = o1.myInt - o2.myInt;
+                       if (diff != 0) {
+                               return diff;
+                       }
+                       diff = (int) (o1.myLong - o2.myLong);
+                       return diff != 0 ? diff : 
o1.myString.compareTo(o2.myString);
+               }
+
+       }
+
        public static DataSet<Tuple7<Integer, String, Integer, Integer, Long, 
String, Long>> getSmallTuplebasedDataSet(ExecutionEnvironment env) {
                List<Tuple7<Integer, String, Integer, Integer, Long, String, 
Long>> data = new ArrayList<Tuple7<Integer, String, Integer, Integer, Long, 
String, Long>>();
                data.add(new Tuple7<Integer, String, Integer, Integer, Long, 
String, Long>(1, "First", 10, 100, 1000L, "One", 10000L));

Reply via email to