Repository: flink
Updated Branches:
  refs/heads/master b566e484a -> 597d8b862


[FLINK-1544] [streaming] POJO types added to AggregationFunctionTest

This closes #517


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/597d8b86
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/597d8b86
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/597d8b86

Branch: refs/heads/master
Commit: 597d8b86276fd5b8c501658683758816365c3edb
Parents: b566e48
Author: szape <[email protected]>
Authored: Wed Mar 11 15:36:15 2015 +0100
Committer: mbalassi <[email protected]>
Committed: Wed Mar 25 10:19:55 2015 +0100

----------------------------------------------------------------------
 .../streaming/api/AggregationFunctionTest.java  | 293 ++++++++++++++-----
 1 file changed, 217 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/597d8b86/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
index 115f614..bc0023a 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
@@ -17,12 +17,6 @@
 
 package org.apache.flink.streaming.api;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -40,6 +34,13 @@ import org.apache.flink.streaming.util.MockContext;
 import org.apache.flink.streaming.util.keys.KeySelectorUtil;
 import org.junit.Test;
 
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
 public class AggregationFunctionTest {
 
        @Test
@@ -73,15 +74,15 @@ public class AggregationFunctionTest {
 
                        int groupedSum;
                        switch (i % 3) {
-                       case 0:
-                               groupedSum = groupedSum0 += i;
-                               break;
-                       case 1:
-                               groupedSum = groupedSum1 += i;
-                               break;
-                       default:
-                               groupedSum = groupedSum2 += i;
-                               break;
+                               case 0:
+                                       groupedSum = groupedSum0 += i;
+                                       break;
+                               case 1:
+                                       groupedSum = groupedSum1 += i;
+                                       break;
+                               default:
+                                       groupedSum = groupedSum2 += i;
+                                       break;
                        }
 
                        expectedGroupSumList.add(new Tuple2<Integer, Integer>(i 
% 3, groupedSum));
@@ -118,7 +119,7 @@ public class AggregationFunctionTest {
                                .getForObject(new Tuple2<Integer, Integer>(1, 
1));
 
                KeySelector<Tuple2<Integer, Integer>, ?> keySelector = 
KeySelectorUtil.getSelectorForKeys(
-                               new Keys.ExpressionKeys<Tuple2<Integer, 
Integer>>(new int[] { 0 }, typeInfo),
+                               new Keys.ExpressionKeys<Tuple2<Integer, 
Integer>>(new int[]{0}, typeInfo),
                                typeInfo, new ExecutionConfig());
 
                List<Tuple2<Integer, Integer>> groupedSumList = 
MockContext.createAndExecute(
@@ -166,73 +167,101 @@ public class AggregationFunctionTest {
                        // Nothing to do here
                }
 
-               ReduceFunction<Tuple2<Integer, Integer>> maxByFunctionFirst = 
ComparableAggregator
-                               .getAggregator(0, type1, AggregationType.MAXBY, 
true);
-               ReduceFunction<Tuple2<Integer, Integer>> maxByFunctionLast = 
ComparableAggregator
-                               .getAggregator(0, type1, AggregationType.MAXBY, 
false);
+       }
 
-               ReduceFunction<Tuple2<Integer, Integer>> minByFunctionFirst = 
ComparableAggregator
-                               .getAggregator(0, type1, AggregationType.MINBY, 
true);
-               ReduceFunction<Tuple2<Integer, Integer>> minByFunctionLast = 
ComparableAggregator
-                               .getAggregator(0, type1, AggregationType.MINBY, 
false);
+       @Test
+       public void pojoGroupSumIntegerTest() {
+               List<MyPojo> expectedSumList = new ArrayList<MyPojo>();
+               List<MyPojo> expectedMinList = new ArrayList<MyPojo>();
+               List<MyPojo> expectedMaxList = new ArrayList<MyPojo>();
+               List<Integer> expectedSumList0 = new ArrayList<Integer>();
+               List<Integer> expectedMinList0 = new ArrayList<Integer>();
+               List<Integer> expectedMaxList0 = new ArrayList<Integer>();
+               List<MyPojo> expectedGroupSumList = new ArrayList<MyPojo>();
+               List<MyPojo> expectedGroupMinList = new ArrayList<MyPojo>();
+               List<MyPojo> expectedGroupMaxList = new ArrayList<MyPojo>();
 
-               List<Tuple2<Integer, Integer>> maxByFirstExpected = new 
ArrayList<Tuple2<Integer, Integer>>();
-               maxByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-               maxByFirstExpected.add(new Tuple2<Integer, Integer>(1, 1));
-               maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
-               maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
-               maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
-               maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
-               maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
-               maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
-               maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
+               List<Integer> simpleInput = new ArrayList<Integer>();
 
-               List<Tuple2<Integer, Integer>> maxByLastExpected = new 
ArrayList<Tuple2<Integer, Integer>>();
-               maxByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
-               maxByLastExpected.add(new Tuple2<Integer, Integer>(1, 1));
-               maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2));
-               maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2));
-               maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2));
-               maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5));
-               maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5));
-               maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5));
-               maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 8));
+               int groupedSum0 = 0;
+               int groupedSum1 = 0;
+               int groupedSum2 = 0;
 
-               List<Tuple2<Integer, Integer>> minByFirstExpected = new 
ArrayList<Tuple2<Integer, Integer>>();
-               minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-               minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-               minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-               minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-               minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-               minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-               minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-               minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-               minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
+               for (int i = 0; i < 9; i++) {
+                       simpleInput.add(i);
+                       expectedSumList.add(new MyPojo(i % 3, (i + 1) * i / 2));
+                       expectedMinList.add(new MyPojo(i % 3, 0));
+                       expectedMaxList.add(new MyPojo(i % 3, i));
 
-               List<Tuple2<Integer, Integer>> minByLastExpected = new 
ArrayList<Tuple2<Integer, Integer>>();
-               minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
-               minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
-               minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
-               minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3));
-               minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3));
-               minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3));
-               minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
-               minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
-               minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
+                       expectedSumList0.add((i + 1) * i / 2);
+                       expectedMaxList0.add(i);
+                       expectedMinList0.add(0);
 
-               assertEquals(maxByFirstExpected, MockContext.createAndExecute(
-                               new StreamReduceInvokable<Tuple2<Integer, 
Integer>>(maxByFunctionFirst),
-                               getInputList()));
-               assertEquals(maxByLastExpected, MockContext.createAndExecute(
-                               new StreamReduceInvokable<Tuple2<Integer, 
Integer>>(maxByFunctionLast),
-                               getInputList()));
-               assertEquals(minByLastExpected, MockContext.createAndExecute(
-                               new StreamReduceInvokable<Tuple2<Integer, 
Integer>>(minByFunctionLast),
-                               getInputList()));
-               assertEquals(minByFirstExpected, MockContext.createAndExecute(
-                               new StreamReduceInvokable<Tuple2<Integer, 
Integer>>(minByFunctionFirst),
-                               getInputList()));
+                       int groupedSum;
+                       switch (i % 3) {
+                               case 0:
+                                       groupedSum = groupedSum0 += i;
+                                       break;
+                               case 1:
+                                       groupedSum = groupedSum1 += i;
+                                       break;
+                               default:
+                                       groupedSum = groupedSum2 += i;
+                                       break;
+                       }
+
+                       expectedGroupSumList.add(new MyPojo(i % 3, groupedSum));
+                       expectedGroupMinList.add(new MyPojo(i % 3, i % 3));
+                       expectedGroupMaxList.add(new MyPojo(i % 3, i));
+               }
 
+               TypeInformation<MyPojo> type1 = TypeExtractor.getForObject(new 
MyPojo(0, 0));
+               TypeInformation<Integer> type2 = TypeExtractor.getForObject(0);
+               ExecutionConfig config = new ExecutionConfig();
+
+               ReduceFunction<MyPojo> sumFunction = 
SumAggregator.getSumFunction("f1", type1, config);
+               ReduceFunction<Integer> sumFunction0 = 
SumAggregator.getSumFunction(0, Integer.class, type2);
+               ReduceFunction<MyPojo> minFunction = 
ComparableAggregator.getAggregator("f1", type1, AggregationType.MIN,
+                               false, config);
+               ReduceFunction<Integer> minFunction0 = 
ComparableAggregator.getAggregator(0, type2, AggregationType.MIN);
+               ReduceFunction<MyPojo> maxFunction = 
ComparableAggregator.getAggregator("f1", type1, AggregationType.MAX,
+                               false, config);
+               ReduceFunction<Integer> maxFunction0 = 
ComparableAggregator.getAggregator(0, type2, AggregationType.MAX);
+
+               List<MyPojo> sumList = MockContext.createAndExecute(
+                               new StreamReduceInvokable<MyPojo>(sumFunction), 
getInputPojoList());
+               List<MyPojo> minList = MockContext.createAndExecute(
+                               new StreamReduceInvokable<MyPojo>(minFunction), 
getInputPojoList());
+               List<MyPojo> maxList = MockContext.createAndExecute(
+                               new StreamReduceInvokable<MyPojo>(maxFunction), 
getInputPojoList());
+
+               TypeInformation<MyPojo> typeInfo = 
TypeExtractor.getForObject(new MyPojo(1, 1));
+               KeySelector<MyPojo, ?> keySelector = 
KeySelectorUtil.getSelectorForKeys(
+                               new Keys.ExpressionKeys<MyPojo>(new 
String[]{"f0"}, typeInfo),
+                               typeInfo, config);
+
+               List<MyPojo> groupedSumList = MockContext.createAndExecute(
+                               new GroupedReduceInvokable<MyPojo>(sumFunction, 
keySelector),
+                               getInputPojoList());
+               List<MyPojo> groupedMinList = MockContext.createAndExecute(
+                               new GroupedReduceInvokable<MyPojo>(minFunction, 
keySelector),
+                               getInputPojoList());
+               List<MyPojo> groupedMaxList = MockContext.createAndExecute(
+                               new GroupedReduceInvokable<MyPojo>(maxFunction, 
keySelector),
+                               getInputPojoList());
+
+               assertEquals(expectedSumList, sumList);
+               assertEquals(expectedMinList, minList);
+               assertEquals(expectedMaxList, maxList);
+               assertEquals(expectedGroupSumList, groupedSumList);
+               assertEquals(expectedGroupMinList, groupedMinList);
+               assertEquals(expectedGroupMaxList, groupedMaxList);
+               assertEquals(expectedSumList0, MockContext.createAndExecute(
+                               new 
StreamReduceInvokable<Integer>(sumFunction0), simpleInput));
+               assertEquals(expectedMinList0, MockContext.createAndExecute(
+                               new 
StreamReduceInvokable<Integer>(minFunction0), simpleInput));
+               assertEquals(expectedMaxList0, MockContext.createAndExecute(
+                               new 
StreamReduceInvokable<Integer>(maxFunction0), simpleInput));
        }
 
        @Test
@@ -308,6 +337,80 @@ public class AggregationFunctionTest {
                                getInputList()));
        }
 
+       @Test
+       public void pojoMinMaxByTest() {
+               ExecutionConfig config = new ExecutionConfig();
+               TypeInformation<MyPojo> type1 = TypeExtractor
+                               .getForObject(new MyPojo(0, 0));
+
+               ReduceFunction<MyPojo> maxByFunctionFirst = ComparableAggregator
+                               .getAggregator("f0", type1, 
AggregationType.MAXBY, true, config);
+               ReduceFunction<MyPojo> maxByFunctionLast = ComparableAggregator
+                               .getAggregator("f0", type1, 
AggregationType.MAXBY, false, config);
+
+               ReduceFunction<MyPojo> minByFunctionFirst = ComparableAggregator
+                               .getAggregator("f0", type1, 
AggregationType.MINBY, true, config);
+               ReduceFunction<MyPojo> minByFunctionLast = ComparableAggregator
+                               .getAggregator("f0", type1, 
AggregationType.MINBY, false, config);
+
+               List<MyPojo> maxByFirstExpected = new ArrayList<MyPojo>();
+               maxByFirstExpected.add(new MyPojo(0, 0));
+               maxByFirstExpected.add(new MyPojo(1, 1));
+               maxByFirstExpected.add(new MyPojo(2, 2));
+               maxByFirstExpected.add(new MyPojo(2, 2));
+               maxByFirstExpected.add(new MyPojo(2, 2));
+               maxByFirstExpected.add(new MyPojo(2, 2));
+               maxByFirstExpected.add(new MyPojo(2, 2));
+               maxByFirstExpected.add(new MyPojo(2, 2));
+               maxByFirstExpected.add(new MyPojo(2, 2));
+
+               List<MyPojo> maxByLastExpected = new ArrayList<MyPojo>();
+               maxByLastExpected.add(new MyPojo(0, 0));
+               maxByLastExpected.add(new MyPojo(1, 1));
+               maxByLastExpected.add(new MyPojo(2, 2));
+               maxByLastExpected.add(new MyPojo(2, 2));
+               maxByLastExpected.add(new MyPojo(2, 2));
+               maxByLastExpected.add(new MyPojo(2, 5));
+               maxByLastExpected.add(new MyPojo(2, 5));
+               maxByLastExpected.add(new MyPojo(2, 5));
+               maxByLastExpected.add(new MyPojo(2, 8));
+
+               List<MyPojo> minByFirstExpected = new ArrayList<MyPojo>();
+               minByFirstExpected.add(new MyPojo(0, 0));
+               minByFirstExpected.add(new MyPojo(0, 0));
+               minByFirstExpected.add(new MyPojo(0, 0));
+               minByFirstExpected.add(new MyPojo(0, 0));
+               minByFirstExpected.add(new MyPojo(0, 0));
+               minByFirstExpected.add(new MyPojo(0, 0));
+               minByFirstExpected.add(new MyPojo(0, 0));
+               minByFirstExpected.add(new MyPojo(0, 0));
+               minByFirstExpected.add(new MyPojo(0, 0));
+
+               List<MyPojo> minByLastExpected = new ArrayList<MyPojo>();
+               minByLastExpected.add(new MyPojo(0, 0));
+               minByLastExpected.add(new MyPojo(0, 0));
+               minByLastExpected.add(new MyPojo(0, 0));
+               minByLastExpected.add(new MyPojo(0, 3));
+               minByLastExpected.add(new MyPojo(0, 3));
+               minByLastExpected.add(new MyPojo(0, 3));
+               minByLastExpected.add(new MyPojo(0, 6));
+               minByLastExpected.add(new MyPojo(0, 6));
+               minByLastExpected.add(new MyPojo(0, 6));
+
+               assertEquals(maxByFirstExpected, MockContext.createAndExecute(
+                               new 
StreamReduceInvokable<MyPojo>(maxByFunctionFirst),
+                               getInputPojoList()));
+               assertEquals(maxByLastExpected, MockContext.createAndExecute(
+                               new 
StreamReduceInvokable<MyPojo>(maxByFunctionLast),
+                               getInputPojoList()));
+               assertEquals(minByLastExpected, MockContext.createAndExecute(
+                               new 
StreamReduceInvokable<MyPojo>(minByFunctionLast),
+                               getInputPojoList()));
+               assertEquals(minByFirstExpected, MockContext.createAndExecute(
+                               new 
StreamReduceInvokable<MyPojo>(minByFunctionFirst),
+                               getInputPojoList()));
+       }
+
        private List<Tuple2<Integer, Integer>> getInputList() {
                ArrayList<Tuple2<Integer, Integer>> inputList = new 
ArrayList<Tuple2<Integer, Integer>>();
                for (int i = 0; i < 9; i++) {
@@ -316,4 +419,42 @@ public class AggregationFunctionTest {
                return inputList;
 
        }
+
+       private List<MyPojo> getInputPojoList() {
+               ArrayList<MyPojo> inputList = new ArrayList<MyPojo>();
+               for (int i = 0; i < 9; i++) {
+                       inputList.add(new MyPojo(i % 3, i));
+               }
+               return inputList;
+
+       }
+
+       public static class MyPojo implements Serializable {
+
+               public int f0;
+               public int f1;
+
+               public MyPojo(int f0, int f1) {
+                       this.f0 = f0;
+                       this.f1 = f1;
+               }
+
+               public MyPojo() {
+               }
+
+               @Override
+               public String toString() {
+                       return "POJO(" + f0 + "," + f1 + ")";
+               }
+
+               @Override
+               public boolean equals(Object other) {
+                       if (other instanceof MyPojo) {
+                               return this.f0 == ((MyPojo) other).f0 && 
this.f1 == ((MyPojo) other).f1;
+                       } else {
+                               return false;
+                       }
+               }
+
+       }
 }

Reply via email to