http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java index 0232464..4259b63 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java @@ -20,6 +20,7 @@ package org.apache.flink.test.javaApiOperators; import java.util.Collection; import java.util.Date; +import java.util.List; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.MapFunction; @@ -37,12 +38,8 @@ import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.PojoWithDateAndEnum; import org.apache.flink.test.util.MultipleProgramsTestBase; import org.apache.flink.util.Collector; -import org.junit.After; import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -54,22 +51,6 @@ public class ReduceITCase extends MultipleProgramsTestBase { super(mode); } - private String resultPath; - private String expected; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expected, resultPath); - } - @Test public void testReduceOnTuplesWithKeyFieldSelector() throws Exception { /* @@ -82,15 +63,16 @@ public class ReduceITCase extends MultipleProgramsTestBase { DataSet<Tuple3<Integer, Long, String>> reduceDs = ds. groupBy(1).reduce(new Tuple3Reduce("B-)")); - reduceDs.writeAsCsv(resultPath); - env.execute(); + List<Tuple3<Integer, Long, String>> result = reduceDs.collect(); - expected = "1,1,Hi\n" + + String expected = "1,1,Hi\n" + "5,2,B-)\n" + "15,3,B-)\n" + "34,4,B-)\n" + "65,5,B-)\n" + "111,6,B-)\n"; + + compareResultAsTuples(result, expected); } @Test @@ -105,10 +87,10 @@ public class ReduceITCase extends MultipleProgramsTestBase { DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds. groupBy(4,0).reduce(new Tuple5Reduce()); - reduceDs.writeAsCsv(resultPath); - env.execute(); + List<Tuple5<Integer, Long, Integer, String, Long>> result = reduceDs + .collect(); - expected = "1,1,0,Hallo,1\n" + + String expected = "1,1,0,Hallo,1\n" + "2,3,2,Hallo Welt wie,1\n" + "2,2,1,Hallo Welt,2\n" + "3,9,0,P-),2\n" + @@ -118,6 +100,8 @@ public class ReduceITCase extends MultipleProgramsTestBase { "5,11,10,GHI,1\n" + "5,29,0,P-),2\n" + "5,25,0,P-),3\n"; + + compareResultAsTuples(result, expected); } @Test @@ -132,15 +116,16 @@ public class ReduceITCase extends MultipleProgramsTestBase { DataSet<Tuple3<Integer, Long, String>> reduceDs = ds. groupBy(new KeySelector1()).reduce(new Tuple3Reduce("B-)")); - reduceDs.writeAsCsv(resultPath); - env.execute(); + List<Tuple3<Integer, Long, String>> result = reduceDs.collect(); - expected = "1,1,Hi\n" + + String expected = "1,1,Hi\n" + "5,2,B-)\n" + "15,3,B-)\n" + "34,4,B-)\n" + "65,5,B-)\n" + "111,6,B-)\n"; + + compareResultAsTuples(result, expected); } public static class KeySelector1 implements KeySelector<Tuple3<Integer,Long,String>, Long> { @@ -163,15 +148,16 @@ public class ReduceITCase extends MultipleProgramsTestBase { DataSet<CustomType> reduceDs = ds. groupBy(new KeySelector2()).reduce(new CustomTypeReduce()); - reduceDs.writeAsText(resultPath); - env.execute(); + List<CustomType> result = reduceDs.collect(); - expected = "1,0,Hi\n" + + String expected = "1,0,Hi\n" + "2,3,Hello!\n" + "3,12,Hello!\n" + "4,30,Hello!\n" + "5,60,Hello!\n" + "6,105,Hello!\n"; + + compareResultAsText(result, expected); } public static class KeySelector2 implements KeySelector<CustomType, Integer> { @@ -194,10 +180,11 @@ public class ReduceITCase extends MultipleProgramsTestBase { DataSet<Tuple3<Integer, Long, String>> reduceDs = ds. reduce(new AllAddingTuple3Reduce()); - reduceDs.writeAsCsv(resultPath); - env.execute(); + List<Tuple3<Integer, Long, String>> result = reduceDs.collect(); - expected = "231,91,Hello World\n"; + String expected = "231,91,Hello World\n"; + + compareResultAsTuples(result, expected); } @Test @@ -212,10 +199,11 @@ public class ReduceITCase extends MultipleProgramsTestBase { DataSet<CustomType> reduceDs = ds. reduce(new AllAddingCustomTypeReduce()); - reduceDs.writeAsText(resultPath); - env.execute(); + List<CustomType> result = reduceDs.collect(); + + String expected = "91,210,Hello!"; - expected = "91,210,Hello!"; + compareResultAsText(result, expected); } @Test @@ -232,15 +220,16 @@ public class ReduceITCase extends MultipleProgramsTestBase { DataSet<Tuple3<Integer, Long, String>> reduceDs = ds. groupBy(1).reduce(new BCTuple3Reduce()).withBroadcastSet(intDs, "ints"); - reduceDs.writeAsCsv(resultPath); - env.execute(); + List<Tuple3<Integer, Long, String>> result = reduceDs.collect(); - expected = "1,1,Hi\n" + + String expected = "1,1,Hi\n" + "5,2,55\n" + "15,3,55\n" + "34,4,55\n" + "65,5,55\n" + "111,6,55\n"; + + compareResultAsTuples(result, expected); } @Test @@ -255,10 +244,10 @@ public class ReduceITCase extends MultipleProgramsTestBase { DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds . groupBy(new KeySelector3()).reduce(new Tuple5Reduce()); - reduceDs.writeAsCsv(resultPath); - env.execute(); + List<Tuple5<Integer, Long, Integer, String, Long>> result = reduceDs + .collect(); - expected = "1,1,0,Hallo,1\n" + + String expected = "1,1,0,Hallo,1\n" + "2,3,2,Hallo Welt wie,1\n" + "2,2,1,Hallo Welt,2\n" + "3,9,0,P-),2\n" + @@ -268,6 +257,8 @@ public class ReduceITCase extends MultipleProgramsTestBase { "5,11,10,GHI,1\n" + "5,29,0,P-),2\n" + "5,25,0,P-),3\n"; + + compareResultAsTuples(result, expected); } public static class KeySelector3 implements KeySelector<Tuple5<Integer,Long,Integer,String,Long>, Tuple2<Integer, Long>> { @@ -291,10 +282,10 @@ public class ReduceITCase extends MultipleProgramsTestBase { DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds. groupBy("f4","f0").reduce(new Tuple5Reduce()); - reduceDs.writeAsCsv(resultPath); - env.execute(); + List<Tuple5<Integer, Long, Integer, String, Long>> result = reduceDs + .collect(); - expected = "1,1,0,Hallo,1\n" + + String expected = "1,1,0,Hallo,1\n" + "2,3,2,Hallo Welt wie,1\n" + "2,2,1,Hallo Welt,2\n" + "3,9,0,P-),2\n" + @@ -304,6 +295,8 @@ public class ReduceITCase extends MultipleProgramsTestBase { "5,11,10,GHI,1\n" + "5,29,0,P-),2\n" + "5,25,0,P-),3\n"; + + compareResultAsTuples(result, expected); } @Test @@ -317,9 +310,11 @@ public class ReduceITCase extends MultipleProgramsTestBase { DataSet<String> res = ds.groupBy("group").reduceGroup(new GroupReducer1()); - res.writeAsText(resultPath); - env.execute(); - expected = "ok\nok"; + List<String> result = res.collect(); + + String expected = "ok\nok"; + + compareResultAsText(result, expected); } public static class Mapper1 implements MapFunction<Long, PojoWithDateAndEnum> { @@ -369,20 +364,20 @@ public class ReduceITCase extends MultipleProgramsTestBase { out.collect("ok"); } } - + public static class Tuple3Reduce implements ReduceFunction<Tuple3<Integer, Long, String>> { private static final long serialVersionUID = 1L; private final Tuple3<Integer, Long, String> out = new Tuple3<Integer, Long, String>(); private final String f2Replace; - - public Tuple3Reduce() { + + public Tuple3Reduce() { this.f2Replace = null; } - - public Tuple3Reduce(String f2Replace) { + + public Tuple3Reduce(String f2Replace) { this.f2Replace = f2Replace; } - + @Override public Tuple3<Integer, Long, String> reduce( @@ -397,41 +392,41 @@ public class ReduceITCase extends MultipleProgramsTestBase { return out; } } - + public static class Tuple5Reduce implements ReduceFunction<Tuple5<Integer, Long, Integer, String, Long>> { private static final long serialVersionUID = 1L; private final Tuple5<Integer, Long, Integer, String, Long> out = new Tuple5<Integer, Long, Integer, String, Long>(); - + @Override public Tuple5<Integer, Long, Integer, String, Long> reduce( Tuple5<Integer, Long, Integer, String, Long> in1, Tuple5<Integer, Long, Integer, String, Long> in2) - throws Exception { - + throws Exception { + out.setFields(in1.f0, in1.f1+in2.f1, 0, "P-)", in1.f4); return out; } } - + public static class CustomTypeReduce implements ReduceFunction<CustomType> { private static final long serialVersionUID = 1L; private final CustomType out = new CustomType(); - + @Override public CustomType reduce(CustomType in1, CustomType in2) throws Exception { - + out.myInt = in1.myInt; out.myLong = in1.myLong + in2.myLong; out.myString = "Hello!"; return out; } } - + public static class AllAddingTuple3Reduce implements ReduceFunction<Tuple3<Integer, Long, String>> { private static final long serialVersionUID = 1L; private final Tuple3<Integer, Long, String> out = new Tuple3<Integer, Long, String>(); - + @Override public Tuple3<Integer, Long, String> reduce( Tuple3<Integer, Long, String> in1, @@ -441,37 +436,37 @@ public class ReduceITCase extends MultipleProgramsTestBase { return out; } } - + public static class AllAddingCustomTypeReduce implements ReduceFunction<CustomType> { private static final long serialVersionUID = 1L; private final CustomType out = new CustomType(); - + @Override public CustomType reduce(CustomType in1, CustomType in2) throws Exception { - + out.myInt = in1.myInt + in2.myInt; out.myLong = in1.myLong + in2.myLong; out.myString = "Hello!"; return out; } } - + public static class BCTuple3Reduce extends RichReduceFunction<Tuple3<Integer, Long, String>> { private static final long serialVersionUID = 1L; private final Tuple3<Integer, Long, String> out = new Tuple3<Integer, Long, String>(); private String f2Replace = ""; - + @Override public void open(Configuration config) { - + Collection<Integer> ints = this.getRuntimeContext().getBroadcastVariable("ints"); int sum = 0; for(Integer i : ints) { sum += i; } f2Replace = sum+""; - + } @Override @@ -483,5 +478,5 @@ public class ReduceITCase extends MultipleProgramsTestBase { return out; } } - + }
http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReplicatingDataSourceITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReplicatingDataSourceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReplicatingDataSourceITCase.java index c7ca37d..8cc54aa 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReplicatingDataSourceITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReplicatingDataSourceITCase.java @@ -19,6 +19,8 @@ package org.apache.flink.test.javaApiOperators; +import java.util.List; + import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.io.ReplicatingInputFormat; @@ -32,11 +34,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.io.GenericInputSplit; import org.apache.flink.test.util.MultipleProgramsTestBase; import org.apache.flink.util.NumberSequenceIterator; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -51,23 +49,6 @@ public class ReplicatingDataSourceITCase extends MultipleProgramsTestBase { super(mode); } - private String resultPath; - - private String expectedResult; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); // 500500 = 0+1+2+3+...+999+1000 - } - @Test public void testReplicatedSourceToJoin() throws Exception { /* @@ -85,11 +66,11 @@ public class ReplicatingDataSourceITCase extends MultipleProgramsTestBase { .projectFirst(0) .sum(0); - pairs.writeAsText(resultPath); - env.execute(); + List<Tuple> result = pairs.collect(); - expectedResult = "(500500)"; + String expectedResult = "(500500)"; + compareResultAsText(result, expectedResult); } @Test @@ -120,11 +101,11 @@ public class ReplicatingDataSourceITCase extends MultipleProgramsTestBase { }) .sum(0); - pairs.writeAsText(resultPath); - env.execute(); + List<Tuple1<Long>> result = pairs.collect(); - expectedResult = "(500500)"; + String expectedResult = "(500500)"; + compareResultAsText(result, expectedResult); } http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java index d961f3a..2b7226b 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java @@ -31,16 +31,13 @@ import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO; import org.apache.flink.test.util.MultipleProgramsTestBase; import org.apache.flink.util.Collector; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import java.io.Serializable; import java.util.Iterator; +import java.util.List; @RunWith(Parameterized.class) public class SortPartitionITCase extends MultipleProgramsTestBase { @@ -49,22 +46,6 @@ public class SortPartitionITCase extends MultipleProgramsTestBase { super(mode); } - private String resultPath; - private String expected; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expected, resultPath); - } - @Test public void testSortPartitionByKeyField() throws Exception { /* @@ -75,16 +56,15 @@ public class SortPartitionITCase extends MultipleProgramsTestBase { env.setParallelism(4); DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - ds + List result = ds .map(new IdMapper()).setParallelism(4) // parallelize input .sortPartition(1, Order.DESCENDING) .mapPartition(new OrderCheckMapper<Tuple3<Integer, Long, String>>(new Tuple3Checker())) - .distinct() - .writeAsText(resultPath); + .distinct().collect(); - env.execute(); + String expected = "(true)\n"; - expected = "(true)\n"; + compareResultAsText(result, expected); } @Test @@ -97,19 +77,19 @@ public class SortPartitionITCase extends MultipleProgramsTestBase { env.setParallelism(2); DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env); - ds + List result = ds .map(new IdMapper()).setParallelism(2) // parallelize input .sortPartition(4, Order.ASCENDING) .sortPartition(2, Order.DESCENDING) .mapPartition(new OrderCheckMapper<Tuple5<Integer, Long, Integer, String, Long>>(new Tuple5Checker())) - .distinct() - .writeAsText(resultPath); + .distinct().collect(); - env.execute(); + String expected = "(true)\n"; - expected = "(true)\n"; + compareResultAsText(result, expected); } + @SuppressWarnings({ "rawtypes", "unchecked" }) @Test public void testSortPartitionByFieldExpression() throws Exception { /* @@ -120,16 +100,15 @@ public class SortPartitionITCase extends MultipleProgramsTestBase { env.setParallelism(4); DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - ds + List result = ds .map(new IdMapper()).setParallelism(4) // parallelize input .sortPartition("f1", Order.DESCENDING) .mapPartition(new OrderCheckMapper<Tuple3<Integer, Long, String>>(new Tuple3Checker())) - .distinct() - .writeAsText(resultPath); + .distinct().collect(); - env.execute(); + String expected = "(true)\n"; - expected = "(true)\n"; + compareResultAsText(result, expected); } @Test @@ -142,17 +121,16 @@ public class SortPartitionITCase extends MultipleProgramsTestBase { env.setParallelism(2); DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env); - ds + List result = ds .map(new IdMapper()).setParallelism(2) // parallelize input .sortPartition("f4", Order.ASCENDING) .sortPartition("f2", Order.DESCENDING) .mapPartition(new OrderCheckMapper<Tuple5<Integer, Long, Integer, String, Long>>(new Tuple5Checker())) - .distinct() - .writeAsText(resultPath); + .distinct().collect(); - env.execute(); + String expected = "(true)\n"; - expected = "(true)\n"; + compareResultAsText(result, expected); } @Test @@ -165,17 +143,16 @@ public class SortPartitionITCase extends MultipleProgramsTestBase { env.setParallelism(3); DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = CollectionDataSets.getGroupSortedNestedTupleDataSet(env); - ds + List result = ds .map(new IdMapper()).setParallelism(3) // parallelize input .sortPartition("f0.f1", Order.ASCENDING) .sortPartition("f1", Order.DESCENDING) .mapPartition(new OrderCheckMapper<Tuple2<Tuple2<Integer, Integer>, String>>(new NestedTupleChecker())) - .distinct() - .writeAsText(resultPath); + .distinct().collect(); - env.execute(); + String expected = "(true)\n"; - expected = "(true)\n"; + compareResultAsText(result, expected); } @Test @@ -188,17 +165,16 @@ public class SortPartitionITCase extends MultipleProgramsTestBase { env.setParallelism(3); DataSet<POJO> ds = CollectionDataSets.getMixedPojoDataSet(env); - ds + List result = ds .map(new IdMapper()).setParallelism(1) // parallelize input .sortPartition("nestedTupleWithCustom.f1.myString", Order.ASCENDING) .sortPartition("number", Order.DESCENDING) .mapPartition(new OrderCheckMapper<POJO>(new PojoChecker())) - .distinct() - .writeAsText(resultPath); + .distinct().collect(); - env.execute(); + String expected = "(true)\n"; - expected = "(true)\n"; + compareResultAsText(result, expected); } @Test @@ -211,15 +187,14 @@ public class SortPartitionITCase extends MultipleProgramsTestBase { env.setParallelism(3); DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - ds + List result = ds .sortPartition(1, Order.DESCENDING).setParallelism(3) // change parallelism .mapPartition(new OrderCheckMapper<Tuple3<Integer, Long, String>>(new Tuple3Checker())) - .distinct() - .writeAsText(resultPath); + .distinct().collect(); - env.execute(); + String expected = "(true)\n"; - expected = "(true)\n"; + compareResultAsText(result, expected); } public static interface OrderChecker<T> extends Serializable { @@ -237,7 +212,7 @@ public class SortPartitionITCase extends MultipleProgramsTestBase { public static class Tuple5Checker implements OrderChecker<Tuple5<Integer, Long, Integer, String, Long>> { @Override public boolean inOrder(Tuple5<Integer, Long, Integer, String, Long> t1, - Tuple5<Integer, Long, Integer, String, Long> t2) { + Tuple5<Integer, Long, Integer, String, Long> t2) { return t1.f4 < t2.f4 || t1.f4 == t2.f4 && t1.f2 >= t2.f2; } } @@ -245,19 +220,18 @@ public class SortPartitionITCase extends MultipleProgramsTestBase { public static class NestedTupleChecker implements OrderChecker<Tuple2<Tuple2<Integer, Integer>, String>> { @Override public boolean inOrder(Tuple2<Tuple2<Integer, Integer>, String> t1, - Tuple2<Tuple2<Integer, Integer>, String> t2) { + Tuple2<Tuple2<Integer, Integer>, String> t2) { return t1.f0.f1 < t2.f0.f1 || t1.f0.f1 == t2.f0.f1 && t1.f1.compareTo(t2.f1) >= 0; - } + } } public static class PojoChecker implements OrderChecker<POJO> { @Override - public boolean inOrder(POJO t1, - POJO t2) { + public boolean inOrder(POJO t1, POJO t2) { return t1.nestedTupleWithCustom.f1.myString.compareTo(t2.nestedTupleWithCustom.f1.myString) < 0 || t1.nestedTupleWithCustom.f1.myString.compareTo(t2.nestedTupleWithCustom.f1.myString) == 0 && - t1.number >= t2.number; + t1.number >= t2.number; } } http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SumMinMaxITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SumMinMaxITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SumMinMaxITCase.java index e6367c3..e5bdc19 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SumMinMaxITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SumMinMaxITCase.java @@ -18,6 +18,8 @@ package org.apache.flink.test.javaApiOperators; +import java.util.List; + import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple1; @@ -25,11 +27,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -40,22 +38,6 @@ public class SumMinMaxITCase extends MultipleProgramsTestBase { super(mode); } - private String resultPath; - private String expected; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expected, resultPath); - } - @Test public void testSumMaxAndProject() throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); @@ -66,10 +48,11 @@ public class SumMinMaxITCase extends MultipleProgramsTestBase { .andMax(1) .project(0, 1); - sumDs.writeAsCsv(resultPath); - env.execute(); + List<Tuple2<Integer, Long>> result = sumDs.collect(); + + String expected = "231,6\n"; - expected = "231,6\n"; + compareResultAsTuples(result, expected); } @Test @@ -85,15 +68,16 @@ public class SumMinMaxITCase extends MultipleProgramsTestBase { .sum(0) .project(1, 0); - aggregateDs.writeAsCsv(resultPath); - env.execute(); + List<Tuple2<Long, Integer>> result = aggregateDs.collect(); - expected = "1,1\n" + + String expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n"; + + compareResultAsTuples(result, expected); } @Test @@ -110,9 +94,10 @@ public class SumMinMaxITCase extends MultipleProgramsTestBase { .min(0) .project(0); - aggregateDs.writeAsCsv(resultPath); - env.execute(); + List<Tuple1<Integer>> result = aggregateDs.collect(); + + String expected = "1\n"; - expected = "1\n"; + compareResultAsTuples(result, expected); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/TypeHintITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/TypeHintITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/TypeHintITCase.java index 350227a..a2c10bc 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/TypeHintITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/TypeHintITCase.java @@ -22,6 +22,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.util.Collection; import java.util.LinkedList; +import java.util.List; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; @@ -44,25 +45,14 @@ public class TypeHintITCase extends JavaProgramTestBase { private static int NUM_PROGRAMS = 3; private int curProgId = config.getInteger("ProgramId", -1); - private String resultPath; - private String expectedResult; public TypeHintITCase(Configuration config) { super(config); } @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - @Override protected void testProgram() throws Exception { - expectedResult = TypeHintProgs.runProgram(curProgId, resultPath); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(expectedResult, resultPath); + TypeHintProgs.runProgram(curProgId); } @Parameters @@ -81,7 +71,7 @@ public class TypeHintITCase extends JavaProgramTestBase { private static class TypeHintProgs { - public static String runProgram(int progId, String resultPath) throws Exception { + public static void runProgram(int progId) throws Exception { switch(progId) { // Test identity map with missing types and string type hint case 1: { @@ -91,13 +81,14 @@ public class TypeHintITCase extends JavaProgramTestBase { DataSet<Tuple3<Integer, Long, String>> identityMapDs = ds .map(new Mapper<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>()) .returns("Tuple3<Integer, Long, String>"); - identityMapDs.writeAsText(resultPath); - env.execute(); + List<Tuple3<Integer, Long, String>> result = identityMapDs.collect(); - // return expected result - return "(2,2,Hello)\n" + - "(3,2,Hello world)\n" + - "(1,1,Hi)\n"; + String expectedResult = "(2,2,Hello)\n" + + "(3,2,Hello world)\n" + + "(1,1,Hi)\n"; + + compareResultAsText(result, expectedResult); + break; } // Test identity map with missing types and type information type hint case 2: { @@ -108,32 +99,34 @@ public class TypeHintITCase extends JavaProgramTestBase { // all following generics get erased during compilation .map(new Mapper<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>()) .returns(new TupleTypeInfo<Tuple3<Integer, Long, String>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)); - identityMapDs.writeAsText(resultPath); - env.execute(); + List<Tuple3<Integer, Long, String>> result = identityMapDs + .collect(); + + String expectedResult = "(2,2,Hello)\n" + + "(3,2,Hello world)\n" + + "(1,1,Hi)\n"; - // return expected result - return "(2,2,Hello)\n" + - "(3,2,Hello world)\n" + - "(1,1,Hi)\n"; + compareResultAsText(result, expectedResult); + break; } // Test flat map with class type hint case 3: { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env); - @SuppressWarnings({ "rawtypes", "unchecked" }) DataSet<Integer> identityMapDs = ds. flatMap(new FlatMapper<Tuple3<Integer, Long, String>, Integer>()) - .returns((Class) Integer.class); - identityMapDs.writeAsText(resultPath); - env.execute(); - - // return expected result - return "2\n" + - "3\n" + - "1\n"; + .returns(Integer.class); + List<Integer> result = identityMapDs.collect(); + + String expectedResult = "2\n" + + "3\n" + + "1\n"; + + compareResultAsText(result, expectedResult); + break; } - default: + default: throw new IllegalArgumentException("Invalid program id"); } } @@ -150,7 +143,7 @@ public class TypeHintITCase extends JavaProgramTestBase { return (V) value; } } - + public static class FlatMapper<T, V> implements FlatMapFunction<T, V> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java index 2e7ae9c..7ab2764 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java @@ -18,15 +18,13 @@ package org.apache.flink.test.javaApiOperators; +import java.util.List; + import org.apache.flink.api.common.functions.RichFilterFunction; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.apache.flink.api.java.DataSet; @@ -61,22 +59,6 @@ public class UnionITCase extends MultipleProgramsTestBase { super(mode); } - private String resultPath; - private String expected; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expected, resultPath); - } - @Test public void testUnion2IdenticalDataSets() throws Exception { /* @@ -87,10 +69,11 @@ public class UnionITCase extends MultipleProgramsTestBase { DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); DataSet<Tuple3<Integer, Long, String>> unionDs = ds.union(CollectionDataSets.get3TupleDataSet(env)); - unionDs.writeAsCsv(resultPath); - env.execute(); + List<Tuple3<Integer, Long, String>> result = unionDs.collect(); + + String expected = FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING; - expected = FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING; + compareResultAsTuples(result, expected); } @Test @@ -107,11 +90,13 @@ public class UnionITCase extends MultipleProgramsTestBase { .union(CollectionDataSets.get3TupleDataSet(env)) .union(CollectionDataSets.get3TupleDataSet(env)); - unionDs.writeAsCsv(resultPath); - env.execute(); + List<Tuple3<Integer, Long, String>> result = unionDs.collect(); - expected = FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING + + String expected = FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING + + FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING; + + compareResultAsTuples(result, expected); } @Test @@ -128,10 +113,11 @@ public class UnionITCase extends MultipleProgramsTestBase { DataSet<Tuple3<Integer, Long, String>> unionDs = CollectionDataSets.get3TupleDataSet(env) .union(empty); - unionDs.writeAsCsv(resultPath); - env.execute(); + List<Tuple3<Integer, Long, String>> result = unionDs.collect(); - expected = FULL_TUPLE_3_STRING; + String expected = FULL_TUPLE_3_STRING; + + compareResultAsTuples(result, expected); } public static class RichFilter1 extends RichFilterFunction<Tuple3<Integer,Long,String>> { @@ -142,5 +128,5 @@ public class UnionITCase extends MultipleProgramsTestBase { return false; } } - + } http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java index a68fd82..1faf4c1 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java @@ -24,6 +24,7 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.Date; import java.util.GregorianCalendar; import java.util.HashMap; @@ -313,6 +314,19 @@ public class CollectionDataSets { } } + public static class CustomTypeComparator implements Comparator<CustomType> { + @Override + public int compare(CustomType o1, CustomType o2) { + int diff = o1.myInt - o2.myInt; + if (diff != 0) { + return diff; + } + diff = (int) (o1.myLong - o2.myLong); + return diff != 0 ? diff : o1.myString.compareTo(o2.myString); + } + + } + public static DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> getSmallTuplebasedDataSet(ExecutionEnvironment env) { List<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> data = new ArrayList<Tuple7<Integer, String, Integer, Integer, Long, String, Long>>(); data.add(new Tuple7<Integer, String, Integer, Integer, Long, String, Long>(1, "First", 10, 100, 1000L, "One", 10000L));
