Repository: flink Updated Branches: refs/heads/master b566e484a -> 597d8b862
[FLINK-1544] [streaming] POJO types added to AggregationFunctionTest This closes #517 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/597d8b86 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/597d8b86 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/597d8b86 Branch: refs/heads/master Commit: 597d8b86276fd5b8c501658683758816365c3edb Parents: b566e48 Author: szape <[email protected]> Authored: Wed Mar 11 15:36:15 2015 +0100 Committer: mbalassi <[email protected]> Committed: Wed Mar 25 10:19:55 2015 +0100 ---------------------------------------------------------------------- .../streaming/api/AggregationFunctionTest.java | 293 ++++++++++++++----- 1 file changed, 217 insertions(+), 76 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/597d8b86/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java index 115f614..bc0023a 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java @@ -17,12 +17,6 @@ package org.apache.flink.streaming.api; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -import java.util.ArrayList; -import java.util.List; - import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -40,6 +34,13 @@ import org.apache.flink.streaming.util.MockContext; import org.apache.flink.streaming.util.keys.KeySelectorUtil; import org.junit.Test; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + public class AggregationFunctionTest { @Test @@ -73,15 +74,15 @@ public class AggregationFunctionTest { int groupedSum; switch (i % 3) { - case 0: - groupedSum = groupedSum0 += i; - break; - case 1: - groupedSum = groupedSum1 += i; - break; - default: - groupedSum = groupedSum2 += i; - break; + case 0: + groupedSum = groupedSum0 += i; + break; + case 1: + groupedSum = groupedSum1 += i; + break; + default: + groupedSum = groupedSum2 += i; + break; } expectedGroupSumList.add(new Tuple2<Integer, Integer>(i % 3, groupedSum)); @@ -118,7 +119,7 @@ public class AggregationFunctionTest { .getForObject(new Tuple2<Integer, Integer>(1, 1)); KeySelector<Tuple2<Integer, Integer>, ?> keySelector = KeySelectorUtil.getSelectorForKeys( - new Keys.ExpressionKeys<Tuple2<Integer, Integer>>(new int[] { 0 }, typeInfo), + new Keys.ExpressionKeys<Tuple2<Integer, Integer>>(new int[]{0}, typeInfo), typeInfo, new ExecutionConfig()); List<Tuple2<Integer, Integer>> groupedSumList = MockContext.createAndExecute( @@ -166,73 +167,101 @@ public class AggregationFunctionTest { // Nothing to do here } - ReduceFunction<Tuple2<Integer, Integer>> maxByFunctionFirst = ComparableAggregator - .getAggregator(0, type1, AggregationType.MAXBY, true); - ReduceFunction<Tuple2<Integer, Integer>> maxByFunctionLast = ComparableAggregator - .getAggregator(0, type1, AggregationType.MAXBY, false); + } - ReduceFunction<Tuple2<Integer, Integer>> minByFunctionFirst = ComparableAggregator - .getAggregator(0, type1, AggregationType.MINBY, true); - ReduceFunction<Tuple2<Integer, Integer>> minByFunctionLast = ComparableAggregator - .getAggregator(0, type1, AggregationType.MINBY, false); + @Test + public void pojoGroupSumIntegerTest() { + List<MyPojo> expectedSumList = new ArrayList<MyPojo>(); + List<MyPojo> expectedMinList = new ArrayList<MyPojo>(); + List<MyPojo> expectedMaxList = new ArrayList<MyPojo>(); + List<Integer> expectedSumList0 = new ArrayList<Integer>(); + List<Integer> expectedMinList0 = new ArrayList<Integer>(); + List<Integer> expectedMaxList0 = new ArrayList<Integer>(); + List<MyPojo> expectedGroupSumList = new ArrayList<MyPojo>(); + List<MyPojo> expectedGroupMinList = new ArrayList<MyPojo>(); + List<MyPojo> expectedGroupMaxList = new ArrayList<MyPojo>(); - List<Tuple2<Integer, Integer>> maxByFirstExpected = new ArrayList<Tuple2<Integer, Integer>>(); - maxByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0)); - maxByFirstExpected.add(new Tuple2<Integer, Integer>(1, 1)); - maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2)); - maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2)); - maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2)); - maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2)); - maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2)); - maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2)); - maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2)); + List<Integer> simpleInput = new ArrayList<Integer>(); - List<Tuple2<Integer, Integer>> maxByLastExpected = new ArrayList<Tuple2<Integer, Integer>>(); - maxByLastExpected.add(new Tuple2<Integer, Integer>(0, 0)); - maxByLastExpected.add(new Tuple2<Integer, Integer>(1, 1)); - maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2)); - maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2)); - maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2)); - maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5)); - maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5)); - maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5)); - maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 8)); + int groupedSum0 = 0; + int groupedSum1 = 0; + int groupedSum2 = 0; - List<Tuple2<Integer, Integer>> minByFirstExpected = new ArrayList<Tuple2<Integer, Integer>>(); - minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0)); - minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0)); - minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0)); - minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0)); - minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0)); - minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0)); - minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0)); - minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0)); - minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0)); + for (int i = 0; i < 9; i++) { + simpleInput.add(i); + expectedSumList.add(new MyPojo(i % 3, (i + 1) * i / 2)); + expectedMinList.add(new MyPojo(i % 3, 0)); + expectedMaxList.add(new MyPojo(i % 3, i)); - List<Tuple2<Integer, Integer>> minByLastExpected = new ArrayList<Tuple2<Integer, Integer>>(); - minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0)); - minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0)); - minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0)); - minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3)); - minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3)); - minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3)); - minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6)); - minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6)); - minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6)); + expectedSumList0.add((i + 1) * i / 2); + expectedMaxList0.add(i); + expectedMinList0.add(0); - assertEquals(maxByFirstExpected, MockContext.createAndExecute( - new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxByFunctionFirst), - getInputList())); - assertEquals(maxByLastExpected, MockContext.createAndExecute( - new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxByFunctionLast), - getInputList())); - assertEquals(minByLastExpected, MockContext.createAndExecute( - new StreamReduceInvokable<Tuple2<Integer, Integer>>(minByFunctionLast), - getInputList())); - assertEquals(minByFirstExpected, MockContext.createAndExecute( - new StreamReduceInvokable<Tuple2<Integer, Integer>>(minByFunctionFirst), - getInputList())); + int groupedSum; + switch (i % 3) { + case 0: + groupedSum = groupedSum0 += i; + break; + case 1: + groupedSum = groupedSum1 += i; + break; + default: + groupedSum = groupedSum2 += i; + break; + } + + expectedGroupSumList.add(new MyPojo(i % 3, groupedSum)); + expectedGroupMinList.add(new MyPojo(i % 3, i % 3)); + expectedGroupMaxList.add(new MyPojo(i % 3, i)); + } + TypeInformation<MyPojo> type1 = TypeExtractor.getForObject(new MyPojo(0, 0)); + TypeInformation<Integer> type2 = TypeExtractor.getForObject(0); + ExecutionConfig config = new ExecutionConfig(); + + ReduceFunction<MyPojo> sumFunction = SumAggregator.getSumFunction("f1", type1, config); + ReduceFunction<Integer> sumFunction0 = SumAggregator.getSumFunction(0, Integer.class, type2); + ReduceFunction<MyPojo> minFunction = ComparableAggregator.getAggregator("f1", type1, AggregationType.MIN, + false, config); + ReduceFunction<Integer> minFunction0 = ComparableAggregator.getAggregator(0, type2, AggregationType.MIN); + ReduceFunction<MyPojo> maxFunction = ComparableAggregator.getAggregator("f1", type1, AggregationType.MAX, + false, config); + ReduceFunction<Integer> maxFunction0 = ComparableAggregator.getAggregator(0, type2, AggregationType.MAX); + + List<MyPojo> sumList = MockContext.createAndExecute( + new StreamReduceInvokable<MyPojo>(sumFunction), getInputPojoList()); + List<MyPojo> minList = MockContext.createAndExecute( + new StreamReduceInvokable<MyPojo>(minFunction), getInputPojoList()); + List<MyPojo> maxList = MockContext.createAndExecute( + new StreamReduceInvokable<MyPojo>(maxFunction), getInputPojoList()); + + TypeInformation<MyPojo> typeInfo = TypeExtractor.getForObject(new MyPojo(1, 1)); + KeySelector<MyPojo, ?> keySelector = KeySelectorUtil.getSelectorForKeys( + new Keys.ExpressionKeys<MyPojo>(new String[]{"f0"}, typeInfo), + typeInfo, config); + + List<MyPojo> groupedSumList = MockContext.createAndExecute( + new GroupedReduceInvokable<MyPojo>(sumFunction, keySelector), + getInputPojoList()); + List<MyPojo> groupedMinList = MockContext.createAndExecute( + new GroupedReduceInvokable<MyPojo>(minFunction, keySelector), + getInputPojoList()); + List<MyPojo> groupedMaxList = MockContext.createAndExecute( + new GroupedReduceInvokable<MyPojo>(maxFunction, keySelector), + getInputPojoList()); + + assertEquals(expectedSumList, sumList); + assertEquals(expectedMinList, minList); + assertEquals(expectedMaxList, maxList); + assertEquals(expectedGroupSumList, groupedSumList); + assertEquals(expectedGroupMinList, groupedMinList); + assertEquals(expectedGroupMaxList, groupedMaxList); + assertEquals(expectedSumList0, MockContext.createAndExecute( + new StreamReduceInvokable<Integer>(sumFunction0), simpleInput)); + assertEquals(expectedMinList0, MockContext.createAndExecute( + new StreamReduceInvokable<Integer>(minFunction0), simpleInput)); + assertEquals(expectedMaxList0, MockContext.createAndExecute( + new StreamReduceInvokable<Integer>(maxFunction0), simpleInput)); } @Test @@ -308,6 +337,80 @@ public class AggregationFunctionTest { getInputList())); } + @Test + public void pojoMinMaxByTest() { + ExecutionConfig config = new ExecutionConfig(); + TypeInformation<MyPojo> type1 = TypeExtractor + .getForObject(new MyPojo(0, 0)); + + ReduceFunction<MyPojo> maxByFunctionFirst = ComparableAggregator + .getAggregator("f0", type1, AggregationType.MAXBY, true, config); + ReduceFunction<MyPojo> maxByFunctionLast = ComparableAggregator + .getAggregator("f0", type1, AggregationType.MAXBY, false, config); + + ReduceFunction<MyPojo> minByFunctionFirst = ComparableAggregator + .getAggregator("f0", type1, AggregationType.MINBY, true, config); + ReduceFunction<MyPojo> minByFunctionLast = ComparableAggregator + .getAggregator("f0", type1, AggregationType.MINBY, false, config); + + List<MyPojo> maxByFirstExpected = new ArrayList<MyPojo>(); + maxByFirstExpected.add(new MyPojo(0, 0)); + maxByFirstExpected.add(new MyPojo(1, 1)); + maxByFirstExpected.add(new MyPojo(2, 2)); + maxByFirstExpected.add(new MyPojo(2, 2)); + maxByFirstExpected.add(new MyPojo(2, 2)); + maxByFirstExpected.add(new MyPojo(2, 2)); + maxByFirstExpected.add(new MyPojo(2, 2)); + maxByFirstExpected.add(new MyPojo(2, 2)); + maxByFirstExpected.add(new MyPojo(2, 2)); + + List<MyPojo> maxByLastExpected = new ArrayList<MyPojo>(); + maxByLastExpected.add(new MyPojo(0, 0)); + maxByLastExpected.add(new MyPojo(1, 1)); + maxByLastExpected.add(new MyPojo(2, 2)); + maxByLastExpected.add(new MyPojo(2, 2)); + maxByLastExpected.add(new MyPojo(2, 2)); + maxByLastExpected.add(new MyPojo(2, 5)); + maxByLastExpected.add(new MyPojo(2, 5)); + maxByLastExpected.add(new MyPojo(2, 5)); + maxByLastExpected.add(new MyPojo(2, 8)); + + List<MyPojo> minByFirstExpected = new ArrayList<MyPojo>(); + minByFirstExpected.add(new MyPojo(0, 0)); + minByFirstExpected.add(new MyPojo(0, 0)); + minByFirstExpected.add(new MyPojo(0, 0)); + minByFirstExpected.add(new MyPojo(0, 0)); + minByFirstExpected.add(new MyPojo(0, 0)); + minByFirstExpected.add(new MyPojo(0, 0)); + minByFirstExpected.add(new MyPojo(0, 0)); + minByFirstExpected.add(new MyPojo(0, 0)); + minByFirstExpected.add(new MyPojo(0, 0)); + + List<MyPojo> minByLastExpected = new ArrayList<MyPojo>(); + minByLastExpected.add(new MyPojo(0, 0)); + minByLastExpected.add(new MyPojo(0, 0)); + minByLastExpected.add(new MyPojo(0, 0)); + minByLastExpected.add(new MyPojo(0, 3)); + minByLastExpected.add(new MyPojo(0, 3)); + minByLastExpected.add(new MyPojo(0, 3)); + minByLastExpected.add(new MyPojo(0, 6)); + minByLastExpected.add(new MyPojo(0, 6)); + minByLastExpected.add(new MyPojo(0, 6)); + + assertEquals(maxByFirstExpected, MockContext.createAndExecute( + new StreamReduceInvokable<MyPojo>(maxByFunctionFirst), + getInputPojoList())); + assertEquals(maxByLastExpected, MockContext.createAndExecute( + new StreamReduceInvokable<MyPojo>(maxByFunctionLast), + getInputPojoList())); + assertEquals(minByLastExpected, MockContext.createAndExecute( + new StreamReduceInvokable<MyPojo>(minByFunctionLast), + getInputPojoList())); + assertEquals(minByFirstExpected, MockContext.createAndExecute( + new StreamReduceInvokable<MyPojo>(minByFunctionFirst), + getInputPojoList())); + } + private List<Tuple2<Integer, Integer>> getInputList() { ArrayList<Tuple2<Integer, Integer>> inputList = new ArrayList<Tuple2<Integer, Integer>>(); for (int i = 0; i < 9; i++) { @@ -316,4 +419,42 @@ public class AggregationFunctionTest { return inputList; } + + private List<MyPojo> getInputPojoList() { + ArrayList<MyPojo> inputList = new ArrayList<MyPojo>(); + for (int i = 0; i < 9; i++) { + inputList.add(new MyPojo(i % 3, i)); + } + return inputList; + + } + + public static class MyPojo implements Serializable { + + public int f0; + public int f1; + + public MyPojo(int f0, int f1) { + this.f0 = f0; + this.f1 = f1; + } + + public MyPojo() { + } + + @Override + public String toString() { + return "POJO(" + f0 + "," + f1 + ")"; + } + + @Override + public boolean equals(Object other) { + if (other instanceof MyPojo) { + return this.f0 == ((MyPojo) other).f0 && this.f1 == ((MyPojo) other).f1; + } else { + return false; + } + } + + } }
