Repository: flink
Updated Branches:
  refs/heads/master 0dea359b3 -> 0081fb2ef


[FLINK-2135] Fix faulty cast to GroupReduceFunction

This closes #769


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0081fb2e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0081fb2e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0081fb2e

Branch: refs/heads/master
Commit: 0081fb2ef2bd03d06a786dd8988865d2ff6168c2
Parents: 0dea359
Author: Robert Metzger <[email protected]>
Authored: Wed Jun 3 15:52:12 2015 +0200
Committer: Robert Metzger <[email protected]>
Committed: Thu Jun 4 11:48:06 2015 +0200

----------------------------------------------------------------------
 ...PlanUnwrappingSortedReduceGroupOperator.java |  8 +++---
 .../test/javaApiOperators/FirstNITCase.java     | 29 ++++++++++++++++++++
 2 files changed, 33 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0081fb2e/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
index 63ebfa4..e4d41f4 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
@@ -37,7 +37,7 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, 
K1, K2> extends Gr
        public PlanUnwrappingSortedReduceGroupOperator(GroupReduceFunction<IN, 
OUT> udf, Keys.SelectorFunctionKeys<IN, K1> groupingKey, 
Keys.SelectorFunctionKeys<IN, K2> sortingKey, String name,
                                                                                
        TypeInformation<OUT> outType, TypeInformation<Tuple3<K1, K2, IN>> 
typeInfoWithKey, boolean combinable)
        {
-               super(combinable ? new 
TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K1, 
K2>((RichGroupReduceFunction<IN, OUT>) udf) : new 
TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K1, K2>(udf),
+               super(combinable ? new 
TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K1, K2>(udf) : new 
TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K1, K2>(udf),
                        new UnaryOperatorInformation<Tuple3<K1, K2, IN>, 
OUT>(typeInfoWithKey, outType), groupingKey.computeLogicalKeyPositions(), name);
 
                super.setCombinable(combinable);
@@ -46,7 +46,7 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, 
K1, K2> extends Gr
        // 
--------------------------------------------------------------------------------------------
 
        @RichGroupReduceFunction.Combinable
-       public static final class 
TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K1, K2> extends 
WrappingFunction<RichGroupReduceFunction<IN, OUT>>
+       public static final class 
TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K1, K2> extends 
WrappingFunction<GroupReduceFunction<IN, OUT>>
                implements GroupReduceFunction<Tuple3<K1, K2, IN>, OUT>, 
GroupCombineFunction<Tuple3<K1, K2, IN>, Tuple3<K1, K2, IN>>
        {
 
@@ -55,7 +55,7 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, 
K1, K2> extends Gr
                private Tuple3UnwrappingIterator<IN, K1, K2> iter;
                private Tuple3WrappingCollector<IN, K1, K2> coll;
 
-               private 
TupleUnwrappingGroupCombinableGroupReducer(RichGroupReduceFunction<IN, OUT> 
wrapped) {
+               private 
TupleUnwrappingGroupCombinableGroupReducer(GroupReduceFunction<IN, OUT> 
wrapped) {
                        super(wrapped);
                        this.iter = new Tuple3UnwrappingIterator<IN, K1, K2>();
                        this.coll = new Tuple3WrappingCollector<IN, K1, 
K2>(this.iter);
@@ -72,7 +72,7 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, 
K1, K2> extends Gr
                public void combine(Iterable<Tuple3<K1, K2, IN>> values, 
Collector<Tuple3<K1, K2, IN>> out) throws Exception {
                        iter.set(values.iterator());
                        coll.set(out);
-                       this.wrappedFunction.combine(iter, coll);
+                       
((GroupCombineFunction)this.wrappedFunction).combine(iter, coll);
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0081fb2e/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java
index feb1169..15d98dd 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.GroupReduceOperator;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
@@ -114,6 +116,33 @@ public class FirstNITCase extends MultipleProgramsTestBase 
{
                                + "(5,15)\n(5,14)\n(5,13)\n"
                                + "(6,21)\n(6,20)\n(6,19)\n";
        }
+
+       /**
+        * Test for FLINK-2135
+        */
+       @Test
+       public void testFaultyCast() throws Exception {
+               ExecutionEnvironment ee = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<String> b = ee.fromElements("a", "b");
+               GroupReduceOperator<String, String> a = b.groupBy(new 
KeySelector<String, Long>() {
+                       @Override
+                       public Long getKey(String value) throws Exception {
+                               return 1L;
+                       }
+               }).sortGroup(new KeySelector<String, Double>() {
+                       @Override
+                       public Double getKey(String value) throws Exception {
+                               return 1.0;
+                       }
+               }, Order.DESCENDING).first(1);
+
+               b.writeAsText(resultPath);
+               ee.execute();
+
+               expected = "a\nb";
+
+       }
        
        public static class OneMapper implements MapFunction<Tuple3<Integer, 
Long, String>, Tuple1<Integer>> {
                private static final long serialVersionUID = 1L;

Reply via email to