[FLINK-2275] [tests] Aigrated test from execute() to collect() -> for package 'org.apache.flink.test.javaApiOperators'
Seactivated unstable test (see comment section https://issues.apache.org/jira/browse/FLINK-2275) This closes #866 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a137321a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a137321a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a137321a Branch: refs/heads/master Commit: a137321acfce02d62ce48e03b5de37c388152d28 Parents: 4e9e0d6 Author: mjsax <[email protected]> Authored: Thu Jun 25 01:49:23 2015 +0200 Committer: Stephan Ewen <[email protected]> Committed: Wed Jul 1 16:58:26 2015 +0200 ---------------------------------------------------------------------- .../streaming/connectors/kafka/KafkaITCase.java | 2 + .../test/javaApiOperators/AggregateITCase.java | 59 +- .../test/javaApiOperators/CoGroupITCase.java | 8 +- .../test/javaApiOperators/CrossITCase.java | 140 +++-- .../test/javaApiOperators/DataSinkITCase.java | 38 +- .../test/javaApiOperators/DataSourceITCase.java | 22 +- .../test/javaApiOperators/DistinctITCase.java | 100 ++-- .../test/javaApiOperators/FilterITCase.java | 83 ++- .../test/javaApiOperators/FirstNITCase.java | 57 +- .../test/javaApiOperators/FlatMapITCase.java | 82 ++- .../javaApiOperators/GroupCombineITCase.java | 204 +++---- .../javaApiOperators/GroupReduceITCase.java | 532 ++++++++++--------- .../flink/test/javaApiOperators/JoinITCase.java | 320 +++++------ .../flink/test/javaApiOperators/MapITCase.java | 95 ++-- .../test/javaApiOperators/PartitionITCase.java | 75 +-- .../test/javaApiOperators/ProjectITCase.java | 25 +- .../test/javaApiOperators/ReduceITCase.java | 141 +++-- .../ReplicatingDataSourceITCase.java | 35 +- .../javaApiOperators/SortPartitionITCase.java | 96 ++-- .../test/javaApiOperators/SumMinMaxITCase.java | 43 +- .../test/javaApiOperators/TypeHintITCase.java | 65 +-- .../test/javaApiOperators/UnionITCase.java | 46 +- .../util/CollectionDataSets.java | 14 + 23 files changed, 1016 insertions(+), 1266 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java index 2af56c1..4b763b2 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java @@ -75,6 +75,7 @@ import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.ClassRule; +import org.junit.Ignore; import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; @@ -224,6 +225,7 @@ public class KafkaITCase { * */ @Test + @Ignore public void testPersistentSourceWithOffsetUpdates() throws Exception { LOG.info("Starting testPersistentSourceWithOffsetUpdates()"); http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java index ea7fc5a..d02f228 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java @@ -18,17 +18,15 @@ package org.apache.flink.test.javaApiOperators; +import java.util.List; + import org.apache.flink.api.java.aggregation.Aggregations; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.apache.flink.api.java.DataSet; @@ -38,31 +36,15 @@ import org.apache.flink.api.java.ExecutionEnvironment; public class AggregateITCase extends MultipleProgramsTestBase { - public AggregateITCase(TestExecutionMode mode){ + public AggregateITCase(TestExecutionMode mode) { super(mode); } - private String resultPath; - private String expected; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expected, resultPath); - } - @Test public void testFullAggregate() throws Exception { /* - * Full Aggregate - */ + * Full Aggregate + */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); @@ -70,36 +52,38 @@ public class AggregateITCase extends MultipleProgramsTestBase { DataSet<Tuple2<Integer, Long>> aggregateDs = ds .aggregate(Aggregations.SUM, 0) .and(Aggregations.MAX, 1) - .project(0, 1); + .project(0, 1); + + List<Tuple2<Integer, Long>> result = aggregateDs.collect(); - aggregateDs.writeAsCsv(resultPath); - env.execute(); + String expected = "231,6\n"; - expected = "231,6\n"; + compareResultAsTuples(result, expected); } @Test public void testGroupedAggregate() throws Exception { /* - * Grouped Aggregate - */ + * Grouped Aggregate + */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); DataSet<Tuple2<Long, Integer>> aggregateDs = ds.groupBy(1) .aggregate(Aggregations.SUM, 0) - .project(1, 0); + .project(1, 0); - aggregateDs.writeAsCsv(resultPath); - env.execute(); + List<Tuple2<Long, Integer>> result = aggregateDs.collect(); - expected = "1,1\n" + + String expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n"; + + compareResultAsTuples(result, expected); } @Test @@ -114,11 +98,12 @@ public class AggregateITCase extends MultipleProgramsTestBase { DataSet<Tuple1<Integer>> aggregateDs = ds.groupBy(1) .aggregate(Aggregations.MIN, 0) .aggregate(Aggregations.MIN, 0) - .project(0); + .project(0); + + List<Tuple1<Integer>> result = aggregateDs.collect(); - aggregateDs.writeAsCsv(resultPath); - env.execute(); + String expected = "1\n"; - expected = "1\n"; + compareResultAsTuples(result, expected); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java index 7faa6cc..7bc8480 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java @@ -78,8 +78,8 @@ public class CoGroupITCase extends MultipleProgramsTestBase { @Test public void testCoGroupOnTwoCustomTypeInputsWithKeyExtractors() throws Exception { /* - * CoGroup on two custom type inputs with key extractors - */ + * CoGroup on two custom type inputs with key extractors + */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); @@ -515,7 +515,9 @@ public class CoGroupITCase extends MultipleProgramsTestBase { compareResultAsText(result, expected); } - + + + // -------------------------------------------------------------------------------------------- // UDF classes // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java index 74868a0..63d1ec7 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java @@ -19,6 +19,7 @@ package org.apache.flink.test.javaApiOperators; import java.util.Collection; +import java.util.List; import org.apache.flink.api.common.functions.CrossFunction; import org.apache.flink.api.common.functions.RichCrossFunction; @@ -30,11 +31,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType; import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.apache.flink.api.java.DataSet; @@ -47,22 +44,6 @@ public class CrossITCase extends MultipleProgramsTestBase { super(mode); } - private String resultPath; - private String expected; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expected, resultPath); - } - @Test public void testCorretnessOfCrossOnTwoTupleInputs() throws Exception { /* @@ -75,10 +56,9 @@ public class CrossITCase extends MultipleProgramsTestBase { DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env); DataSet<Tuple2<Integer, String>> crossDs = ds.cross(ds2).with(new Tuple5Cross()); - crossDs.writeAsCsv(resultPath); - env.execute(); + List<Tuple2<Integer, String>> result = crossDs.collect(); - expected = "0,HalloHallo\n" + + String expected = "0,HalloHallo\n" + "1,HalloHallo Welt\n" + "2,HalloHallo Welt wie\n" + "1,Hallo WeltHallo\n" + @@ -87,6 +67,8 @@ public class CrossITCase extends MultipleProgramsTestBase { "2,Hallo Welt wieHallo\n" + "3,Hallo Welt wieHallo Welt\n" + "4,Hallo Welt wieHallo Welt wie\n"; + + compareResultAsTuples(result, expected); } @Test @@ -101,10 +83,9 @@ public class CrossITCase extends MultipleProgramsTestBase { DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env); DataSet<Tuple3<Integer, Long, String>> crossDs = ds.cross(ds2).with(new Tuple3ReturnLeft()); - crossDs.writeAsCsv(resultPath); - env.execute(); + List<Tuple3<Integer, Long, String>> result = crossDs.collect(); - expected = "1,1,Hi\n" + + String expected = "1,1,Hi\n" + "1,1,Hi\n" + "1,1,Hi\n" + "2,2,Hello\n" + @@ -113,6 +94,8 @@ public class CrossITCase extends MultipleProgramsTestBase { "3,2,Hello world\n" + "3,2,Hello world\n" + "3,2,Hello world\n"; + + compareResultAsTuples(result, expected); } @Test @@ -127,10 +110,10 @@ public class CrossITCase extends MultipleProgramsTestBase { DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env); DataSet<Tuple5<Integer, Long, Integer, String, Long>> crossDs = ds.cross(ds2).with(new Tuple5ReturnRight()); - crossDs.writeAsCsv(resultPath); - env.execute(); + List<Tuple5<Integer, Long, Integer, String, Long>> result = crossDs + .collect(); - expected = "1,1,0,Hallo,1\n" + + String expected = "1,1,0,Hallo,1\n" + "1,1,0,Hallo,1\n" + "1,1,0,Hallo,1\n" + "2,2,1,Hallo Welt,2\n" + @@ -140,6 +123,7 @@ public class CrossITCase extends MultipleProgramsTestBase { "2,3,2,Hallo Welt wie,1\n" + "2,3,2,Hallo Welt wie,1\n"; + compareResultAsTuples(result, expected); } @Test @@ -156,10 +140,9 @@ public class CrossITCase extends MultipleProgramsTestBase { DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env); DataSet<Tuple3<Integer, Integer, Integer>> crossDs = ds.cross(ds2).with(new Tuple5CrossBC()).withBroadcastSet(intDs, "ints"); - crossDs.writeAsCsv(resultPath); - env.execute(); + List<Tuple3<Integer, Integer, Integer>> result = crossDs.collect(); - expected = "2,0,55\n" + + String expected = "2,0,55\n" + "3,0,55\n" + "3,0,55\n" + "3,0,55\n" + @@ -168,6 +151,8 @@ public class CrossITCase extends MultipleProgramsTestBase { "3,0,55\n" + "4,2,55\n" + "4,4,55\n"; + + compareResultAsTuples(result, expected); } @Test @@ -182,10 +167,9 @@ public class CrossITCase extends MultipleProgramsTestBase { DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env); DataSet<Tuple2<Integer, String>> crossDs = ds.crossWithHuge(ds2).with(new Tuple5Cross()); - crossDs.writeAsCsv(resultPath); - env.execute(); + List<Tuple2<Integer, String>> result = crossDs.collect(); - expected = "0,HalloHallo\n" + + String expected = "0,HalloHallo\n" + "1,HalloHallo Welt\n" + "2,HalloHallo Welt wie\n" + "1,Hallo WeltHallo\n" + @@ -194,6 +178,8 @@ public class CrossITCase extends MultipleProgramsTestBase { "2,Hallo Welt wieHallo\n" + "3,Hallo Welt wieHallo Welt\n" + "4,Hallo Welt wieHallo Welt wie\n"; + + compareResultAsTuples(result, expected); } @Test @@ -208,10 +194,9 @@ public class CrossITCase extends MultipleProgramsTestBase { DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env); DataSet<Tuple2<Integer, String>> crossDs = ds.crossWithTiny(ds2).with(new Tuple5Cross()); - crossDs.writeAsCsv(resultPath); - env.execute(); + List<Tuple2<Integer, String>> result = crossDs.collect(); - expected = "0,HalloHallo\n" + + String expected = "0,HalloHallo\n" + "1,HalloHallo Welt\n" + "2,HalloHallo Welt wie\n" + "1,Hallo WeltHallo\n" + @@ -220,6 +205,8 @@ public class CrossITCase extends MultipleProgramsTestBase { "2,Hallo Welt wieHallo\n" + "3,Hallo Welt wieHallo Welt\n" + "4,Hallo Welt wieHallo Welt wie\n"; + + compareResultAsTuples(result, expected); } @Test @@ -236,12 +223,11 @@ public class CrossITCase extends MultipleProgramsTestBase { .projectFirst(2, 1) .projectSecond(3) .projectFirst(0) - .projectSecond(4,1); + .projectSecond(4,1); - crossDs.writeAsCsv(resultPath); - env.execute(); + List<Tuple6<String, Long, String, Integer, Long, Long>> result = crossDs.collect(); - expected = "Hi,1,Hallo,1,1,1\n" + + String expected = "Hi,1,Hallo,1,1,1\n" + "Hi,1,Hallo Welt,1,2,2\n" + "Hi,1,Hallo Welt wie,1,1,3\n" + "Hello,2,Hallo,2,1,1\n" + @@ -250,6 +236,8 @@ public class CrossITCase extends MultipleProgramsTestBase { "Hello world,2,Hallo,3,1,1\n" + "Hello world,2,Hallo Welt,3,2,2\n" + "Hello world,2,Hallo Welt wie,3,1,3\n"; + + compareResultAsTuples(result, expected); } @Test @@ -266,12 +254,11 @@ public class CrossITCase extends MultipleProgramsTestBase { .projectSecond(3) .projectFirst(2, 1) .projectSecond(4,1) - .projectFirst(0); + .projectFirst(0); - crossDs.writeAsCsv(resultPath); - env.execute(); + List<Tuple6<String, String, Long, Long, Long, Integer>> result = crossDs.collect(); - expected = "Hallo,Hi,1,1,1,1\n" + + String expected = "Hallo,Hi,1,1,1,1\n" + "Hallo Welt,Hi,1,2,2,1\n" + "Hallo Welt wie,Hi,1,1,3,1\n" + "Hallo,Hello,2,1,1,2\n" + @@ -281,6 +268,7 @@ public class CrossITCase extends MultipleProgramsTestBase { "Hallo Welt,Hello world,2,2,2,3\n" + "Hallo Welt wie,Hello world,2,1,3,3\n"; + compareResultAsTuples(result, expected); } @Test @@ -295,10 +283,10 @@ public class CrossITCase extends MultipleProgramsTestBase { DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env); DataSet<Tuple2<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>>> crossDs = ds.cross(ds2); - crossDs.writeAsCsv(resultPath); - env.execute(); + List<Tuple2<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>>> result = crossDs.collect(); - expected = "(1,1,Hi),(2,2,1,Hallo Welt,2)\n" + + String expected = "(1,1,Hi),(2,2,1,Hallo Welt,2)\n" + + "(1,1,Hi),(1,1,0,Hallo,1)\n" + "(1,1,Hi),(2,3,2,Hallo Welt wie,1)\n" + "(2,2,Hello),(2,2,1,Hallo Welt,2)\n" + @@ -307,6 +295,8 @@ public class CrossITCase extends MultipleProgramsTestBase { "(3,2,Hello world),(2,2,1,Hallo Welt,2)\n" + "(3,2,Hello world),(1,1,0,Hallo,1)\n" + "(3,2,Hello world),(2,3,2,Hallo Welt wie,1)\n"; + + compareResultAsTuples(result, expected); } @Test @@ -321,10 +311,9 @@ public class CrossITCase extends MultipleProgramsTestBase { DataSet<CustomType> ds2 = CollectionDataSets.getSmallCustomTypeDataSet(env); DataSet<CustomType> crossDs = ds.cross(ds2).with(new CustomTypeCross()); - crossDs.writeAsText(resultPath); - env.execute(); + List<CustomType> result = crossDs.collect(); - expected = "1,0,HiHi\n" + String expected = "1,0,HiHi\n" + "2,1,HiHello\n" + "2,2,HiHello world\n" + "2,1,HelloHi\n" @@ -333,6 +322,8 @@ public class CrossITCase extends MultipleProgramsTestBase { + "2,2,Hello worldHi\n" + "4,3,Hello worldHello\n" + "4,4,Hello worldHello world"; + + compareResultAsText(result, expected); } @Test @@ -347,10 +338,9 @@ public class CrossITCase extends MultipleProgramsTestBase { DataSet<CustomType> ds2 = CollectionDataSets.getSmallCustomTypeDataSet(env); DataSet<Tuple3<Integer, Long, String>> crossDs = ds.cross(ds2).with(new MixedCross()); - crossDs.writeAsCsv(resultPath); - env.execute(); + List<Tuple3<Integer, Long, String>> result = crossDs.collect(); - expected = "2,0,HalloHi\n" + + String expected = "2,0,HalloHi\n" + "3,0,HalloHello\n" + "3,0,HalloHello world\n" + "3,0,Hallo WeltHi\n" + @@ -359,24 +349,26 @@ public class CrossITCase extends MultipleProgramsTestBase { "3,0,Hallo Welt wieHi\n" + "4,2,Hallo Welt wieHello\n" + "4,4,Hallo Welt wieHello world\n"; + + compareResultAsTuples(result, expected); } - + public static class Tuple5Cross implements CrossFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, String>> { private static final long serialVersionUID = 1L; - + @Override public Tuple2<Integer, String> cross( Tuple5<Integer, Long, Integer, String, Long> first, Tuple5<Integer, Long, Integer, String, Long> second) throws Exception { - + return new Tuple2<Integer, String>(first.f2+second.f2, first.f3+second.f3); } } - + public static class CustomTypeCross implements CrossFunction<CustomType, CustomType, CustomType> { private static final long serialVersionUID = 1L; @@ -384,12 +376,12 @@ public class CrossITCase extends MultipleProgramsTestBase { @Override public CustomType cross(CustomType first, CustomType second) throws Exception { - + return new CustomType(first.myInt * second.myInt, first.myLong + second.myLong, first.myString + second.myString); } - + } - + public static class MixedCross implements CrossFunction<Tuple5<Integer, Long, Integer, String, Long>, CustomType, Tuple3<Integer, Long, String>> { private static final long serialVersionUID = 1L; @@ -401,12 +393,12 @@ public class CrossITCase extends MultipleProgramsTestBase { return new Tuple3<Integer, Long, String>(first.f0 + second.myInt, first.f2 * second.myLong, first.f3 + second.myString); } - + } - - + + public static class Tuple3ReturnLeft implements CrossFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>> { - + private static final long serialVersionUID = 1L; @Override @@ -417,9 +409,9 @@ public class CrossITCase extends MultipleProgramsTestBase { return first; } } - + public static class Tuple5ReturnRight implements CrossFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> { - + private static final long serialVersionUID = 1L; @Override @@ -427,29 +419,29 @@ public class CrossITCase extends MultipleProgramsTestBase { Tuple3<Integer, Long, String> first, Tuple5<Integer, Long, Integer, String, Long> second) throws Exception { - + return second; } } - + public static class Tuple5CrossBC extends RichCrossFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Integer, Integer>> { private static final long serialVersionUID = 1L; - + private int broadcast = 42; - + @Override public void open(Configuration config) { - + Collection<Integer> ints = this.getRuntimeContext().getBroadcastVariable("ints"); int sum = 0; for(Integer i : ints) { sum += i; } broadcast = sum; - + } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java index 5dc3867..b49bd33 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java @@ -50,7 +50,6 @@ public class DataSinkITCase extends MultipleProgramsTestBase { } private String resultPath; - private String expected; @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); @@ -62,7 +61,6 @@ public class DataSinkITCase extends MultipleProgramsTestBase { @Test public void testIntSortingParallelism1() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Integer> ds = CollectionDataSets.getIntegerDataSet(env); @@ -70,14 +68,13 @@ public class DataSinkITCase extends MultipleProgramsTestBase { env.execute(); - expected = "5\n5\n5\n5\n5\n4\n4\n4\n4\n3\n3\n3\n2\n2\n1\n"; + String expected = "5\n5\n5\n5\n5\n4\n4\n4\n4\n3\n3\n3\n2\n2\n1\n"; compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath); } @Test public void testStringSortingParallelism1() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<String> ds = CollectionDataSets.getStringDataSet(env); @@ -85,7 +82,7 @@ public class DataSinkITCase extends MultipleProgramsTestBase { env.execute(); - expected = "Hello\n" + + String expected = "Hello\n" + "Hello world\n" + "Hello world, how are you?\n" + "Hi\n" + @@ -99,7 +96,6 @@ public class DataSinkITCase extends MultipleProgramsTestBase { @Test public void testTupleSortingSingleAscParallelism1() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); @@ -107,7 +103,7 @@ public class DataSinkITCase extends MultipleProgramsTestBase { env.execute(); - expected = "1,1,Hi\n" + + String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, how are you?\n" + @@ -130,12 +126,10 @@ public class DataSinkITCase extends MultipleProgramsTestBase { "21,6,Comment#15\n"; compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath); - } @Test public void testTupleSortingSingleDescParallelism1() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); @@ -143,7 +137,7 @@ public class DataSinkITCase extends MultipleProgramsTestBase { env.execute(); - expected = "21,6,Comment#15\n" + + String expected = "21,6,Comment#15\n" + "20,6,Comment#14\n" + "19,6,Comment#13\n" + "18,6,Comment#12\n" + @@ -164,13 +158,12 @@ public class DataSinkITCase extends MultipleProgramsTestBase { "3,2,Hello world\n" + "2,2,Hello\n" + "1,1,Hi\n"; - compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath); + compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath); } @Test public void testTupleSortingDualParallelism1() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); @@ -178,7 +171,7 @@ public class DataSinkITCase extends MultipleProgramsTestBase { env.execute(); - expected = "16,6,Comment#10\n" + + String expected = "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" + @@ -199,13 +192,12 @@ public class DataSinkITCase extends MultipleProgramsTestBase { "2,2,Hello\n" + "3,2,Hello world\n" + "1,1,Hi\n"; - compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath); + compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath); } @Test public void testTupleSortingNestedParallelism1() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple3<Tuple2<Integer, Integer>, String, Integer>> ds = @@ -217,7 +209,7 @@ public class DataSinkITCase extends MultipleProgramsTestBase { env.execute(); - expected = + String expected = "((2,1),a,3)\n" + "((2,2),b,4)\n" + "((1,2),a,1)\n" + @@ -231,7 +223,6 @@ public class DataSinkITCase extends MultipleProgramsTestBase { @Test public void testTupleSortingNestedParallelism1_2() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple3<Tuple2<Integer, Integer>, String, Integer>> ds = @@ -243,7 +234,7 @@ public class DataSinkITCase extends MultipleProgramsTestBase { env.execute(); - expected = + String expected = "((2,1),a,3)\n" + "((1,3),a,2)\n" + "((1,2),a,1)\n" + @@ -257,7 +248,6 @@ public class DataSinkITCase extends MultipleProgramsTestBase { @Test public void testPojoSortingSingleParallelism1() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<CollectionDataSets.POJO> ds = CollectionDataSets.getMixedPojoDataSet(env); @@ -265,7 +255,7 @@ public class DataSinkITCase extends MultipleProgramsTestBase { env.execute(); - expected = "1 First (10,100,1000,One) 10100\n" + + String expected = "1 First (10,100,1000,One) 10100\n" + "2 First_ (10,105,1000,One) 10200\n" + "3 First (11,102,3000,One) 10200\n" + "4 First_ (11,106,1000,One) 10300\n" + @@ -275,12 +265,10 @@ public class DataSinkITCase extends MultipleProgramsTestBase { "8 Third_ (30,300,1000,Three) 10100\n"; compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath); - } @Test public void testPojoSortingDualParallelism1() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<CollectionDataSets.POJO> ds = CollectionDataSets.getMixedPojoDataSet(env); @@ -291,7 +279,7 @@ public class DataSinkITCase extends MultipleProgramsTestBase { env.execute(); - expected = + String expected = "5 First (11,102,2000,One) 10100\n" + "3 First (11,102,3000,One) 10200\n" + "1 First (10,100,1000,One) 10100\n" + @@ -307,7 +295,6 @@ public class DataSinkITCase extends MultipleProgramsTestBase { @Test public void testPojoSortingNestedParallelism1() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<CollectionDataSets.POJO> ds = CollectionDataSets.getMixedPojoDataSet(env); @@ -319,7 +306,7 @@ public class DataSinkITCase extends MultipleProgramsTestBase { env.execute(); - expected = + String expected = "2 First_ (10,105,1000,One) 10200\n" + "1 First (10,100,1000,One) 10100\n" + "4 First_ (11,106,1000,One) 10300\n" + @@ -334,7 +321,6 @@ public class DataSinkITCase extends MultipleProgramsTestBase { @Test public void testSortingParallelism4() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Long> ds = env.generateSequence(0, 1000); http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSourceITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSourceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSourceITCase.java index 9755caa..aa40754 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSourceITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSourceITCase.java @@ -18,6 +18,7 @@ package org.apache.flink.test.javaApiOperators; +import java.util.List; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; @@ -30,44 +31,35 @@ import org.junit.Assert; /** * Tests for the DataSource */ - public class DataSourceITCase extends JavaProgramTestBase { - private String resultPath; private String inputPath; - private String expectedResult; - @Override protected void preSubmit() throws Exception { inputPath = createTempFile("input", "ab\n" + "cd\n" + "ef\n"); - resultPath = getTempDirPath("result"); } @Override protected void testProgram() throws Exception { /* - * Test passing a configuration object to an input format - */ + * Test passing a configuration object to an input format + */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); Configuration ifConf = new Configuration(); ifConf.setString("prepend", "test"); DataSet<String> ds = env.createInput(new TestInputFormat(new Path(inputPath))).withParameters(ifConf); - ds.writeAsText(resultPath); - env.execute(); + List<String> result = ds.collect(); - expectedResult= "ab\n" + String expectedResult = "ab\n" + "cd\n" + "ef\n"; - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(expectedResult, resultPath); + + compareResultAsText(result, expectedResult); } private static class TestInputFormat extends TextInputFormat { http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java index 4568ab6..02dbb76 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java @@ -18,6 +18,7 @@ package org.apache.flink.test.javaApiOperators; +import java.util.List; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; @@ -32,11 +33,7 @@ import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO; import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -48,22 +45,6 @@ public class DistinctITCase extends MultipleProgramsTestBase { super(mode); } - private String resultPath; - private String expected; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expected, resultPath); - } - @Test public void testCorrectnessOfDistinctOnTuplesWithKeyFieldSelector() throws Exception { /* @@ -75,17 +56,17 @@ public class DistinctITCase extends MultipleProgramsTestBase { DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env); DataSet<Tuple3<Integer, Long, String>> distinctDs = ds.union(ds).distinct(0, 1, 2); - distinctDs.writeAsCsv(resultPath); - env.execute(); + List<Tuple3<Integer, Long, String>> result = distinctDs.collect(); - expected = "1,1,Hi\n" + + String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n"; + + compareResultAsTuples(result, expected); } @Test - public void testCorrectnessOfDistinctOnTuplesWithKeyFieldSelectorWithNotAllFieldsSelected() - throws Exception{ + public void testCorrectnessOfDistinctOnTuplesWithKeyFieldSelectorWithNotAllFieldsSelected() throws Exception{ /* * check correctness of distinct on tuples with key field selector with not all fields selected */ @@ -93,13 +74,14 @@ public class DistinctITCase extends MultipleProgramsTestBase { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env); - DataSet<Tuple1<Integer>> distinctDs = ds.union(ds).distinct(0).project(0); + DataSet<Tuple1<Integer>> distinctDs = ds.union(ds).distinct(0).project(0); - distinctDs.writeAsCsv(resultPath); - env.execute(); + List<Tuple1<Integer>> result = distinctDs.collect(); - expected = "1\n" + + String expected = "1\n" + "2\n"; + + compareResultAsTuples(result, expected); } @Test @@ -111,15 +93,13 @@ public class DistinctITCase extends MultipleProgramsTestBase { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env); - DataSet<Tuple1<Integer>> reduceDs = ds.union(ds) - .distinct(new KeySelector1()).project(0); + DataSet<Tuple1<Integer>> reduceDs = ds.union(ds).distinct(new KeySelector1()).project(0); + List<Tuple1<Integer>> result = reduceDs.collect(); - reduceDs.writeAsCsv(resultPath); - env.execute(); + String expected = "1\n" + "2\n"; - expected = "1\n" + - "2\n"; + compareResultAsTuples(result, expected); } public static class KeySelector1 implements KeySelector<Tuple5<Integer, Long, Integer, String, Long>, Integer> { @@ -143,15 +123,16 @@ public class DistinctITCase extends MultipleProgramsTestBase { .distinct(new KeySelector3()) .map(new Mapper3()); - reduceDs.writeAsCsv(resultPath); - env.execute(); + List<Tuple1<Integer>> result = reduceDs.collect(); - expected = "1\n" + + String expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"; + + compareResultAsTuples(result, expected); } public static class Mapper3 extends RichMapFunction<CustomType, Tuple1<Integer>> { @@ -180,17 +161,17 @@ public class DistinctITCase extends MultipleProgramsTestBase { DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env); DataSet<Tuple3<Integer, Long, String>> distinctDs = ds.union(ds).distinct(); - distinctDs.writeAsCsv(resultPath); - env.execute(); + List<Tuple3<Integer, Long, String>> result = distinctDs.collect(); - expected = "1,1,Hi\n" + + String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n"; + + compareResultAsTuples(result, expected); } @Test - public void testCorrectnessOfDistinctOnCustomTypeWithTupleReturningTypeExtractor() throws - Exception{ + public void testCorrectnessOfDistinctOnCustomTypeWithTupleReturningTypeExtractor() throws Exception{ /* * check correctness of distinct on custom type with tuple-returning type extractor */ @@ -200,12 +181,11 @@ public class DistinctITCase extends MultipleProgramsTestBase { DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env); DataSet<Tuple2<Integer, Long>> reduceDs = ds .distinct(new KeySelector2()) - .project(0,4); + .project(0,4); - reduceDs.writeAsCsv(resultPath); - env.execute(); + List<Tuple2<Integer, Long>> result = reduceDs.collect(); - expected = "1,1\n" + + String expected = "1,1\n" + "2,1\n" + "2,2\n" + "3,2\n" + @@ -215,6 +195,8 @@ public class DistinctITCase extends MultipleProgramsTestBase { "5,1\n" + "5,2\n" + "5,3\n"; + + compareResultAsTuples(result, expected); } public static class KeySelector2 implements KeySelector<Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Long>> { @@ -234,14 +216,14 @@ public class DistinctITCase extends MultipleProgramsTestBase { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env); - DataSet<Tuple1<Integer>> reduceDs = ds.union(ds) - .distinct("f0").project(0); + DataSet<Tuple1<Integer>> reduceDs = ds.union(ds).distinct("f0").project(0); - reduceDs.writeAsCsv(resultPath); - env.execute(); + List<Tuple1<Integer>> result = reduceDs.collect(); - expected = "1\n" + + String expected = "1\n" + "2\n"; + + compareResultAsTuples(result, expected); } @Test @@ -255,10 +237,11 @@ public class DistinctITCase extends MultipleProgramsTestBase { DataSet<POJO> ds = CollectionDataSets.getDuplicatePojoDataSet(env); DataSet<Integer> reduceDs = ds.distinct("nestedPojo.longNumber").map(new Mapper2()); - reduceDs.writeAsText(resultPath); - env.execute(); + List<Integer> result = reduceDs.collect(); + + String expected = "10000\n20000\n30000\n"; - expected = "10000\n20000\n30000\n"; + compareResultAsText(result, expected); } public static class Mapper2 implements MapFunction<CollectionDataSets.POJO, Integer> { @@ -278,10 +261,11 @@ public class DistinctITCase extends MultipleProgramsTestBase { DataSet<POJO> ds = CollectionDataSets.getDuplicatePojoDataSet(env); DataSet<Integer> reduceDs = ds.distinct().map(new Mapper1()); - reduceDs.writeAsText(resultPath); - env.execute(); + List<Integer> result = reduceDs.collect(); + + String expected = "10000\n20000\n30000\n"; - expected = "10000\n20000\n30000\n"; + compareResultAsText(result, expected); } public static class Mapper1 implements MapFunction<CollectionDataSets.POJO, Integer> { http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FilterITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FilterITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FilterITCase.java index c46bc46..993b137 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FilterITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FilterITCase.java @@ -19,6 +19,7 @@ package org.apache.flink.test.javaApiOperators; import java.util.Collection; +import java.util.List; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.RichFilterFunction; @@ -27,11 +28,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType; import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.apache.flink.api.java.DataSet; @@ -43,22 +40,6 @@ public class FilterITCase extends MultipleProgramsTestBase { super(mode); } - private String resultPath; - private String expected; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expected, resultPath); - } - @Test public void testAllRejectingFilter() throws Exception { /* @@ -71,10 +52,11 @@ public class FilterITCase extends MultipleProgramsTestBase { DataSet<Tuple3<Integer, Long, String>> filterDs = ds. filter(new Filter1()); - filterDs.writeAsCsv(resultPath); - env.execute(); + List<Tuple3<Integer, Long, String>> result = filterDs.collect(); - expected = "\n"; + String expected = "\n"; + + compareResultAsTuples(result, expected); } public static class Filter1 implements FilterFunction<Tuple3<Integer,Long,String>> { @@ -97,10 +79,9 @@ public class FilterITCase extends MultipleProgramsTestBase { DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); DataSet<Tuple3<Integer, Long, String>> filterDs = ds. filter(new Filter2()); - filterDs.writeAsCsv(resultPath); - env.execute(); + List<Tuple3<Integer, Long, String>> result = filterDs.collect(); - expected = "1,1,Hi\n" + + String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, how are you?\n" + @@ -121,6 +102,8 @@ public class FilterITCase extends MultipleProgramsTestBase { "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"; + + compareResultAsTuples(result, expected); } public static class Filter2 implements FilterFunction<Tuple3<Integer,Long,String>> { @@ -143,12 +126,14 @@ public class FilterITCase extends MultipleProgramsTestBase { DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); DataSet<Tuple3<Integer, Long, String>> filterDs = ds. filter(new Filter3()); - filterDs.writeAsCsv(resultPath); - env.execute(); + List<Tuple3<Integer, Long, String>> result = filterDs.collect(); - expected = "3,2,Hello world\n" + + String expected = "3,2,Hello world\n" + + "4,3,Hello world, how are you?\n"; + compareResultAsTuples(result, expected); + } public static class Filter3 implements FilterFunction<Tuple3<Integer,Long,String>> { @@ -163,18 +148,17 @@ public class FilterITCase extends MultipleProgramsTestBase { @Test public void testFilterOnIntegerTupleField() throws Exception { /* - * Test filter on Integer tuple field. - */ + * Test filter on Integer tuple field. + */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); DataSet<Tuple3<Integer, Long, String>> filterDs = ds. filter(new Filter4()); - filterDs.writeAsCsv(resultPath); - env.execute(); + List<Tuple3<Integer, Long, String>> result = filterDs.collect(); - expected = "2,2,Hello\n" + + String expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4,Comment#2\n" + @@ -184,6 +168,8 @@ public class FilterITCase extends MultipleProgramsTestBase { "16,6,Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"; + + compareResultAsTuples(result, expected); } public static class Filter4 implements FilterFunction<Tuple3<Integer,Long,String>> { @@ -206,13 +192,14 @@ public class FilterITCase extends MultipleProgramsTestBase { DataSet<String> ds = CollectionDataSets.getStringDataSet(env); DataSet<String> filterDs = ds. filter(new Filter5()); - filterDs.writeAsText(resultPath); - env.execute(); + List<String> result = filterDs.collect(); - expected = "Hi\n" + + String expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how are you?\n"; + + compareResultAsText(result, expected); } public static class Filter5 implements FilterFunction<String> { @@ -235,12 +222,14 @@ public class FilterITCase extends MultipleProgramsTestBase { DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env); DataSet<CustomType> filterDs = ds. filter(new Filter6()); - filterDs.writeAsText(resultPath); - env.execute(); + List<CustomType> result = filterDs.collect(); - expected = "3,3,Hello world, how are you?\n" + + String expected = "3,3,Hello world, how are you?\n" + + "3,4,I am fine.\n" + "3,5,Luke Skywalker\n"; + + compareResultAsText(result, expected); } public static class Filter6 implements FilterFunction<CustomType> { @@ -265,13 +254,14 @@ public class FilterITCase extends MultipleProgramsTestBase { DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); DataSet<Tuple3<Integer, Long, String>> filterDs = ds. filter(new RichFilter1()).withBroadcastSet(ints, "ints"); - filterDs.writeAsCsv(resultPath); - env.execute(); + List<Tuple3<Integer, Long, String>> result = filterDs.collect(); - expected = "1,1,Hi\n" + + String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, how are you?\n"; + + compareResultAsTuples(result, expected); } public static class RichFilter1 extends RichFilterFunction<Tuple3<Integer,Long,String>> { @@ -306,14 +296,15 @@ public class FilterITCase extends MultipleProgramsTestBase { DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); DataSet<Tuple3<Integer, Long, String>> filterDs = ds. filter(new RichFilter2()).withBroadcastSet(intDs, "ints"); - filterDs.writeAsCsv(resultPath); - env.execute(); + List<Tuple3<Integer, Long, String>> result = filterDs.collect(); - expected = "11,5,Comment#5\n" + + String expected = "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5,Comment#9\n"; + + compareResultAsTuples(result, expected); } public static class RichFilter2 extends RichFilterFunction<Tuple3<Integer,Long,String>> { http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java index 15d98dd..3eb870d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java @@ -18,6 +18,8 @@ package org.apache.flink.test.javaApiOperators; +import java.util.List; + import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.java.DataSet; @@ -29,11 +31,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -43,22 +41,6 @@ public class FirstNITCase extends MultipleProgramsTestBase { super(mode); } - private String resultPath; - private String expected; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expected, resultPath); - } - @Test public void testFirstNOnUngroupedDS() throws Exception { /* @@ -70,10 +52,11 @@ public class FirstNITCase extends MultipleProgramsTestBase { DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); DataSet<Tuple1<Integer>> seven = ds.first(7).map(new OneMapper()).sum(0); - seven.writeAsText(resultPath); - env.execute(); + List<Tuple1<Integer>> result = seven.collect(); - expected = "(7)\n"; + String expected = "(7)\n"; + + compareResultAsText(result, expected); } @Test @@ -88,10 +71,11 @@ public class FirstNITCase extends MultipleProgramsTestBase { DataSet<Tuple2<Long, Integer>> first = ds.groupBy(1).first(4) .map(new OneMapper2()).groupBy(0).sum(1); - first.writeAsText(resultPath); - env.execute(); + List<Tuple2<Long, Integer>> result = first.collect(); + + String expected = "(1,1)\n(2,2)\n(3,3)\n(4,4)\n(5,4)\n(6,4)\n"; - expected = "(1,1)\n(2,2)\n(3,3)\n(4,4)\n(5,4)\n(6,4)\n"; + compareResultAsText(result, expected); } @Test @@ -104,17 +88,18 @@ public class FirstNITCase extends MultipleProgramsTestBase { DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); DataSet<Tuple2<Long, Integer>> first = ds.groupBy(1).sortGroup(0, Order.DESCENDING).first(3) - .project(1,0); + .project(1,0); - first.writeAsText(resultPath); - env.execute(); + List<Tuple2<Long, Integer>> result = first.collect(); - expected = "(1,1)\n" + String expected = "(1,1)\n" + "(2,3)\n(2,2)\n" + "(3,6)\n(3,5)\n(3,4)\n" + "(4,10)\n(4,9)\n(4,8)\n" + "(5,15)\n(5,14)\n(5,13)\n" + "(6,21)\n(6,20)\n(6,19)\n"; + + compareResultAsText(result, expected); } /** @@ -137,13 +122,13 @@ public class FirstNITCase extends MultipleProgramsTestBase { } }, Order.DESCENDING).first(1); - b.writeAsText(resultPath); - ee.execute(); + List<String> result = b.collect(); - expected = "a\nb"; + String expected = "a\nb"; + compareResultAsText(result, expected); } - + public static class OneMapper implements MapFunction<Tuple3<Integer, Long, String>, Tuple1<Integer>> { private static final long serialVersionUID = 1L; private final Tuple1<Integer> one = new Tuple1<Integer>(1); @@ -152,7 +137,7 @@ public class FirstNITCase extends MultipleProgramsTestBase { return one; } } - + public static class OneMapper2 implements MapFunction<Tuple3<Integer, Long, String>, Tuple2<Long, Integer>> { private static final long serialVersionUID = 1L; private final Tuple2<Long, Integer> one = new Tuple2<Long, Integer>(0l,1); @@ -162,5 +147,5 @@ public class FirstNITCase extends MultipleProgramsTestBase { return one; } } - + } http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FlatMapITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FlatMapITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FlatMapITCase.java index 37cf1fc..4962da8 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FlatMapITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FlatMapITCase.java @@ -19,6 +19,7 @@ package org.apache.flink.test.javaApiOperators; import java.util.Collection; +import java.util.List; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; @@ -28,11 +29,7 @@ import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType; import org.apache.flink.test.util.MultipleProgramsTestBase; import org.apache.flink.util.Collector; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.apache.flink.api.java.DataSet; @@ -44,22 +41,6 @@ public class FlatMapITCase extends MultipleProgramsTestBase { super(mode); } - private String resultPath; - private String expected; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expected, resultPath); - } - @Test public void testNonPassingFlatMap() throws Exception { /* @@ -72,10 +53,11 @@ public class FlatMapITCase extends MultipleProgramsTestBase { DataSet<String> nonPassingFlatMapDs = ds. flatMap(new FlatMapper1()); - nonPassingFlatMapDs.writeAsText(resultPath); - env.execute(); + List<String> result = nonPassingFlatMapDs.collect(); - expected = "\n"; + String expected = "\n"; + + compareResultAsText(result, expected); } public static class FlatMapper1 implements FlatMapFunction<String, String> { @@ -101,10 +83,9 @@ public class FlatMapITCase extends MultipleProgramsTestBase { DataSet<String> duplicatingFlatMapDs = ds. flatMap(new FlatMapper2()); - duplicatingFlatMapDs.writeAsText(resultPath); - env.execute(); + List<String> result = duplicatingFlatMapDs.collect(); - expected = "Hi\n" + "HI\n" + + String expected = "Hi\n" + "HI\n" + "Hello\n" + "HELLO\n" + "Hello world\n" + "HELLO WORLD\n" + "Hello world, how are you?\n" + "HELLO WORLD, HOW ARE YOU?\n" + @@ -112,6 +93,8 @@ public class FlatMapITCase extends MultipleProgramsTestBase { "Luke Skywalker\n" + "LUKE SKYWALKER\n" + "Random comment\n" + "RANDOM COMMENT\n" + "LOL\n" + "LOL\n"; + + compareResultAsText(result, expected); } public static class FlatMapper2 implements FlatMapFunction<String, String> { @@ -136,10 +119,9 @@ public class FlatMapITCase extends MultipleProgramsTestBase { DataSet<Tuple3<Integer, Long, String>> varyingTuplesMapDs = ds. flatMap(new FlatMapper3()); - varyingTuplesMapDs.writeAsCsv(resultPath); - env.execute(); + List<Tuple3<Integer, Long, String>> result = varyingTuplesMapDs.collect(); - expected = "1,1,Hi\n" + + String expected = "1,1,Hi\n" + "2,2,Hello\n" + "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "5,3,I am fine.\n" + @@ -153,6 +135,8 @@ public class FlatMapITCase extends MultipleProgramsTestBase { "17,6,Comment#11\n" + "17,6,Comment#11\n" + "19,6,Comment#13\n" + "20,6,Comment#14\n" + "20,6,Comment#14\n"; + + compareResultAsTuples(result, expected); } public static class FlatMapper3 implements FlatMapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> { @@ -180,10 +164,9 @@ public class FlatMapITCase extends MultipleProgramsTestBase { DataSet<Tuple3<Integer, Long, String>> typeConversionFlatMapDs = ds. flatMap(new FlatMapper4()); - typeConversionFlatMapDs.writeAsCsv(resultPath); - env.execute(); + List<Tuple3<Integer, Long, String>> result = typeConversionFlatMapDs.collect(); - expected = "1,0,Hi\n" + + String expected = "1,0,Hi\n" + "2,1,Hello\n" + "2,2,Hello world\n" + "3,3,Hello world, how are you?\n" + @@ -204,6 +187,8 @@ public class FlatMapITCase extends MultipleProgramsTestBase { "6,18,Comment#13\n" + "6,19,Comment#14\n" + "6,20,Comment#15\n"; + + compareResultAsTuples(result, expected); } public static class FlatMapper4 implements FlatMapFunction<CustomType, Tuple3<Integer, Long, String>> { @@ -212,8 +197,7 @@ public class FlatMapITCase extends MultipleProgramsTestBase { new Tuple3<Integer, Long, String>(); @Override - public void flatMap(CustomType value, Collector<Tuple3<Integer, Long, String>> out) - throws Exception { + public void flatMap(CustomType value, Collector<Tuple3<Integer, Long, String>> out) throws Exception { outTuple.setField(value.myInt, 0); outTuple.setField(value.myLong, 1); outTuple.setField(value.myString, 2); @@ -233,10 +217,10 @@ public class FlatMapITCase extends MultipleProgramsTestBase { DataSet<String> typeConversionFlatMapDs = ds. flatMap(new FlatMapper5()); - typeConversionFlatMapDs.writeAsText(resultPath); - env.execute(); + List<String> result = typeConversionFlatMapDs.collect(); - expected = "Hi\n" + "Hello\n" + "Hello world\n" + + String expected = "Hi\n" + "Hello\n" + "Hello world\n" + + "Hello world, how are you?\n" + "I am fine.\n" + "Luke Skywalker\n" + "Comment#1\n" + "Comment#2\n" + @@ -247,21 +231,21 @@ public class FlatMapITCase extends MultipleProgramsTestBase { "Comment#11\n" + "Comment#12\n" + "Comment#13\n" + "Comment#14\n" + "Comment#15\n"; + + compareResultAsText(result, expected); } public static class FlatMapper5 implements FlatMapFunction<Tuple3<Integer, Long, String>,String> { private static final long serialVersionUID = 1L; @Override - public void flatMap(Tuple3<Integer, Long, String> value, - Collector<String> out) throws Exception { + public void flatMap(Tuple3<Integer, Long, String> value, Collector<String> out) throws Exception { out.collect(value.f2); } } @Test - public void testFlatMapperIfUDFReturnsInputObjectMultipleTimesWhileChangingIt() throws - Exception { + public void testFlatMapperIfUDFReturnsInputObjectMultipleTimesWhileChangingIt() throws Exception { /* * Test flatmapper if UDF returns input object * multiple times and changes it in between @@ -273,10 +257,9 @@ public class FlatMapITCase extends MultipleProgramsTestBase { DataSet<Tuple3<Integer, Long, String>> inputObjFlatMapDs = ds. flatMap(new FlatMapper6()); - inputObjFlatMapDs.writeAsCsv(resultPath); - env.execute(); + List<Tuple3<Integer, Long, String>> result = inputObjFlatMapDs.collect(); - expected = "0,1,Hi\n" + + String expected = "0,1,Hi\n" + "0,2,Hello\n" + "1,2,Hello\n" + "0,2,Hello world\n" + "1,2,Hello world\n" + "2,2,Hello world\n" + "0,3,I am fine.\n" + @@ -292,6 +275,8 @@ public class FlatMapITCase extends MultipleProgramsTestBase { "0,6,Comment#12\n" + "1,6,Comment#12\n" + "0,6,Comment#13\n" + "1,6,Comment#13\n" + "2,6,Comment#13\n" + "0,6,Comment#15\n"; + + compareResultAsTuples(result, expected); } public static class FlatMapper6 implements FlatMapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> { @@ -321,10 +306,9 @@ public class FlatMapITCase extends MultipleProgramsTestBase { DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); DataSet<Tuple3<Integer, Long, String>> bcFlatMapDs = ds. flatMap(new RichFlatMapper1()).withBroadcastSet(ints, "ints"); - bcFlatMapDs.writeAsCsv(resultPath); - env.execute(); + List<Tuple3<Integer, Long, String>> result = bcFlatMapDs.collect(); - expected = "55,1,Hi\n" + + String expected = "55,1,Hi\n" + "55,2,Hello\n" + "55,2,Hello world\n" + "55,3,Hello world, how are you?\n" + @@ -345,6 +329,8 @@ public class FlatMapITCase extends MultipleProgramsTestBase { "55,6,Comment#13\n" + "55,6,Comment#14\n" + "55,6,Comment#15\n"; + + compareResultAsTuples(result, expected); } public static class RichFlatMapper1 extends RichFlatMapFunction<Tuple3<Integer,Long,String>, Tuple3<Integer,Long,String>> { @@ -370,5 +356,5 @@ public class FlatMapITCase extends MultipleProgramsTestBase { out.collect(outTuple); } } - + } http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java index 3e9fde7..7e6de04 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java @@ -1,20 +1,20 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.flink.test.javaApiOperators; @@ -32,38 +32,29 @@ import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; import org.apache.flink.test.util.MultipleProgramsTestBase; import org.apache.flink.util.Collector; -import org.junit.After; import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import java.util.ArrayList; import java.util.Arrays; - +import java.util.List; @SuppressWarnings("serial") @RunWith(Parameterized.class) /** -* The GroupCombine operator is not easy to test because it is essentially just a combiner. The result can be -* the result of a normal groupReduce at any stage its execution. The basic idea is to preserve the grouping key -* in the partial result, so that we can do a reduceGroup afterwards to finalize the results for verification. -* In addition, we can use hashPartition to partition the data and check if no shuffling (just combining) has -* been performed. -*/ + * The GroupCombine operator is not easy to test because it is essentially just a combiner. The result can be + * the result of a normal groupReduce at any stage its execution. The basic idea is to preserve the grouping key + * in the partial result, so that we can do a reduceGroup afterwards to finalize the results for verification. + * In addition, we can use hashPartition to partition the data and check if no shuffling (just combining) has + * been performed. + */ public class GroupCombineITCase extends MultipleProgramsTestBase { public GroupCombineITCase(TestExecutionMode mode) { super(mode); } - private String resultPath; - - private String expected; - private static String identityResult = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + @@ -86,21 +77,6 @@ public class GroupCombineITCase extends MultipleProgramsTestBase { "20,6,Comment#14\n" + "21,6,Comment#15\n"; - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception { - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception { - if (expected != null) { - compareResultsByLinesInMemory(expected, resultPath); - } - } - @Test public void testAllGroupCombineIdentity() throws Exception { @@ -108,19 +84,15 @@ public class GroupCombineITCase extends MultipleProgramsTestBase { DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - DataSet<Tuple3<Integer, Long, String>> reduceDs = ds // combine .combineGroup(new IdentityFunction()) // fully reduce .reduceGroup(new IdentityFunction()); + List<Tuple3<Integer, Long, String>> result = reduceDs.collect(); - reduceDs.writeAsCsv(resultPath); - - env.execute(); - - expected = identityResult; + compareResultAsTuples(result, identityResult); } @Test @@ -136,11 +108,9 @@ public class GroupCombineITCase extends MultipleProgramsTestBase { // fully reduce .reduceGroup(new IdentityFunction()); - reduceDs.writeAsCsv(resultPath); - - env.execute(); + List<Tuple3<Integer, Long, String>> result = reduceDs.collect(); - expected = identityResult; + compareResultAsTuples(result, identityResult); } @Test @@ -157,12 +127,9 @@ public class GroupCombineITCase extends MultipleProgramsTestBase { // fully reduce .reduceGroup(new IdentityFunction()); + List<Tuple3<Integer, Long, String>> result = reduceDs.collect(); - reduceDs.writeAsCsv(resultPath); - - env.execute(); - - expected = identityResult; + compareResultAsTuples(result, identityResult); } @Test @@ -182,11 +149,9 @@ public class GroupCombineITCase extends MultipleProgramsTestBase { // fully reduce .reduceGroup(new IdentityFunction()); - reduceDs.writeAsCsv(resultPath); + List<Tuple3<Integer, Long, String>> result = reduceDs.collect(); - env.execute(); - - expected = identityResult; + compareResultAsTuples(result, identityResult); } @Test @@ -201,7 +166,7 @@ public class GroupCombineITCase extends MultipleProgramsTestBase { // wrap values as Kv pairs with the grouping key as key .map(new Tuple3KvWrapper()); - dsWrapped + List<Tuple3<Integer, Long, String>> result = dsWrapped .groupBy(0) // reduce partially .combineGroup(new Tuple3toTuple3GroupReduce()) @@ -214,19 +179,16 @@ public class GroupCombineITCase extends MultipleProgramsTestBase { public Tuple3<Integer, Long, String> map(Tuple2<Long, Tuple3<Integer, Long, String>> value) throws Exception { return value.f1; } - }) - .writeAsCsv(resultPath); - + }).collect(); - - env.execute(); - - expected = "1,1,combined\n" + + String expected = "1,1,combined\n" + "5,4,combined\n" + "15,9,combined\n" + "34,16,combined\n" + "65,25,combined\n" + "111,36,combined\n"; + + compareResultAsTuples(result, expected); } @Test @@ -241,33 +203,29 @@ public class GroupCombineITCase extends MultipleProgramsTestBase { // wrap values as Kv pairs with the grouping key as key .map(new Tuple3KvWrapper()); - dsWrapped + List<Tuple2<Integer, Long>> result = dsWrapped .groupBy(0) - // reduce partially + // reduce partially .combineGroup(new Tuple3toTuple2GroupReduce()) .groupBy(0) - // reduce fully to check result + // reduce fully to check result .reduceGroup(new Tuple2toTuple2GroupReduce()) - //unwrap + //unwrap .map(new MapFunction<Tuple2<Long,Tuple2<Integer,Long>>, Tuple2<Integer,Long>>() { @Override public Tuple2<Integer, Long> map(Tuple2<Long, Tuple2<Integer, Long>> value) throws Exception { return value.f1; } - }) - .writeAsCsv(resultPath); + }).collect(); - - - env.execute(); - - expected = "1,3\n" + + String expected = "1,3\n" + "5,20\n" + "15,58\n" + "34,52\n" + "65,70\n" + "111,96\n"; + compareResultAsTuples(result, expected); } @Test @@ -284,7 +242,9 @@ public class GroupCombineITCase extends MultipleProgramsTestBase { // partition and group data UnsortedGrouping<Tuple3<Integer, Long, String>> partitionedDS = ds.partitionByHash(0).groupBy(1); - partitionedDS.combineGroup(new GroupCombineFunction<Tuple3<Integer, Long, String>, Tuple2<Long, Integer>>() { + List<Tuple2<Long, Integer>> result = partitionedDS + .combineGroup( + new GroupCombineFunction<Tuple3<Integer, Long, String>, Tuple2<Long, Integer>>() { @Override public void combine(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple2<Long, Integer>> out) throws Exception { int count = 0; @@ -295,29 +255,17 @@ public class GroupCombineITCase extends MultipleProgramsTestBase { } out.collect(new Tuple2(key, count)); } - }).writeAsCsv(resultPath); - - env.execute(); - - String notExpected = "6,6\n" + - "5,5\n" + - "4,4\n" + - "3,3\n" + - "2,2\n" + - "1,1\n"; + }).collect(); - // check + String[] localExpected = new String[] { "(6,6)", "(5,5)" + "(4,4)", "(3,3)", "(2,2)", "(1,1)" }; - ArrayList<String> list = new ArrayList<String>(); - readAllResultLines(list, resultPath); - - String[] result = list.toArray(new String[list.size()]); - Arrays.sort(result); - - String[] expected = notExpected.split("\n"); - Arrays.sort(expected); + String[] resultAsStringArray = new String[result.size()]; + for (int i = 0; i < resultAsStringArray.length; ++i) { + resultAsStringArray[i] = result.get(i).toString(); + } + Arrays.sort(resultAsStringArray); - Assert.assertEquals("The two arrays were identical.", false, Arrays.equals(expected, result)); + Assert.assertEquals("The two arrays were identical.", false, Arrays.equals(localExpected, resultAsStringArray)); } @Test @@ -334,28 +282,29 @@ public class GroupCombineITCase extends MultipleProgramsTestBase { // partition and group data UnsortedGrouping<Tuple3<Integer, Long, String>> partitionedDS = ds.partitionByHash(0).groupBy(1); - partitionedDS.combineGroup(new GroupCombineFunction<Tuple3<Integer, Long, String>, Tuple2<Long, Integer>>() { - @Override - public void combine(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple2<Long, Integer>> out) throws Exception { - int count = 0; - long key = 0; - for (Tuple3<Integer, Long, String> value : values) { - key = value.f1; - count++; - } - out.collect(new Tuple2(key, count)); - } - }).writeAsCsv(resultPath); - - env.execute(); + List<Tuple2<Long, Integer>> result = partitionedDS + .combineGroup( + new GroupCombineFunction<Tuple3<Integer, Long, String>, Tuple2<Long, Integer>>() { + @Override + public void combine(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple2<Long, Integer>> out) throws Exception { + int count = 0; + long key = 0; + for (Tuple3<Integer, Long, String> value : values) { + key = value.f1; + count++; + } + out.collect(new Tuple2(key, count)); + } + }).collect(); - expected = "6,6\n" + + String expected = "6,6\n" + "5,5\n" + "4,4\n" + "3,3\n" + "2,2\n" + "1,1\n"; + compareResultAsTuples(result, expected); } @Test @@ -373,15 +322,15 @@ public class GroupCombineITCase extends MultipleProgramsTestBase { // all methods on DataSet ds.combineGroup(new GroupCombineFunctionExample()) - .output(new DiscardingOutputFormat<Tuple1<String>>()); + .output(new DiscardingOutputFormat<Tuple1<String>>()); // all methods on UnsortedGrouping ds.groupBy(0).combineGroup(new GroupCombineFunctionExample()) - .output(new DiscardingOutputFormat<Tuple1<String>>()); + .output(new DiscardingOutputFormat<Tuple1<String>>()); // all methods on SortedGrouping ds.groupBy(0).sortGroup(0, Order.ASCENDING).combineGroup(new GroupCombineFunctionExample()) - .output(new DiscardingOutputFormat<Tuple1<String>>()); + .output(new DiscardingOutputFormat<Tuple1<String>>()); env.execute(); } @@ -407,7 +356,7 @@ public class GroupCombineITCase extends MultipleProgramsTestBase { } public static class IdentityFunction implements GroupCombineFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>, - GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> { + GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> { @Override public void combine(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) throws Exception { @@ -427,6 +376,7 @@ public class GroupCombineITCase extends MultipleProgramsTestBase { public static class Tuple3toTuple3GroupReduce implements KvGroupReduce<Long, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> { + @Override public void combine(Iterable<Tuple2<Long, Tuple3<Integer, Long, String>>> values, Collector<Tuple2<Long, Tuple3<Integer, Long, String>>> out) throws Exception { int i = 0; long l = 0; @@ -478,6 +428,7 @@ public class GroupCombineITCase extends MultipleProgramsTestBase { public static class Tuple2toTuple2GroupReduce implements KvGroupReduce<Long, Tuple2<Integer, Long>, Tuple2<Integer, Long>, Tuple2<Integer, Long>> { + @Override public void combine(Iterable<Tuple2<Long, Tuple2<Integer, Long>>> values, Collector<Tuple2<Long, Tuple2<Integer, Long>>> out) throws Exception { int i = 0; long l = 0; @@ -516,5 +467,4 @@ public class GroupCombineITCase extends MultipleProgramsTestBase { public interface KvGroupReduce<K, V, INT, OUT> extends CombineAndReduceGroup<Tuple2<K, V>, Tuple2<K, INT>, Tuple2<K, OUT>> { } - -} \ No newline at end of file +}
