Repository: flink Updated Branches: refs/heads/master 0dea359b3 -> 0081fb2ef
[FLINK-2135] Fix faulty cast to GroupReduceFunction This closes #769 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0081fb2e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0081fb2e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0081fb2e Branch: refs/heads/master Commit: 0081fb2ef2bd03d06a786dd8988865d2ff6168c2 Parents: 0dea359 Author: Robert Metzger <[email protected]> Authored: Wed Jun 3 15:52:12 2015 +0200 Committer: Robert Metzger <[email protected]> Committed: Thu Jun 4 11:48:06 2015 +0200 ---------------------------------------------------------------------- ...PlanUnwrappingSortedReduceGroupOperator.java | 8 +++--- .../test/javaApiOperators/FirstNITCase.java | 29 ++++++++++++++++++++ 2 files changed, 33 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0081fb2e/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java index 63ebfa4..e4d41f4 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java @@ -37,7 +37,7 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr public PlanUnwrappingSortedReduceGroupOperator(GroupReduceFunction<IN, OUT> udf, Keys.SelectorFunctionKeys<IN, K1> groupingKey, Keys.SelectorFunctionKeys<IN, K2> sortingKey, String name, TypeInformation<OUT> outType, TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey, boolean combinable) { - super(combinable ? new TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K1, K2>((RichGroupReduceFunction<IN, OUT>) udf) : new TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K1, K2>(udf), + super(combinable ? new TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K1, K2>(udf) : new TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K1, K2>(udf), new UnaryOperatorInformation<Tuple3<K1, K2, IN>, OUT>(typeInfoWithKey, outType), groupingKey.computeLogicalKeyPositions(), name); super.setCombinable(combinable); @@ -46,7 +46,7 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr // -------------------------------------------------------------------------------------------- @RichGroupReduceFunction.Combinable - public static final class TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K1, K2> extends WrappingFunction<RichGroupReduceFunction<IN, OUT>> + public static final class TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K1, K2> extends WrappingFunction<GroupReduceFunction<IN, OUT>> implements GroupReduceFunction<Tuple3<K1, K2, IN>, OUT>, GroupCombineFunction<Tuple3<K1, K2, IN>, Tuple3<K1, K2, IN>> { @@ -55,7 +55,7 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr private Tuple3UnwrappingIterator<IN, K1, K2> iter; private Tuple3WrappingCollector<IN, K1, K2> coll; - private TupleUnwrappingGroupCombinableGroupReducer(RichGroupReduceFunction<IN, OUT> wrapped) { + private TupleUnwrappingGroupCombinableGroupReducer(GroupReduceFunction<IN, OUT> wrapped) { super(wrapped); this.iter = new Tuple3UnwrappingIterator<IN, K1, K2>(); this.coll = new Tuple3WrappingCollector<IN, K1, K2>(this.iter); @@ -72,7 +72,7 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr public void combine(Iterable<Tuple3<K1, K2, IN>> values, Collector<Tuple3<K1, K2, IN>> out) throws Exception { iter.set(values.iterator()); coll.set(out); - this.wrappedFunction.combine(iter, coll); + ((GroupCombineFunction)this.wrappedFunction).combine(iter, coll); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/0081fb2e/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java index feb1169..15d98dd 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java @@ -22,6 +22,8 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.operators.GroupReduceOperator; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; @@ -114,6 +116,33 @@ public class FirstNITCase extends MultipleProgramsTestBase { + "(5,15)\n(5,14)\n(5,13)\n" + "(6,21)\n(6,20)\n(6,19)\n"; } + + /** + * Test for FLINK-2135 + */ + @Test + public void testFaultyCast() throws Exception { + ExecutionEnvironment ee = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<String> b = ee.fromElements("a", "b"); + GroupReduceOperator<String, String> a = b.groupBy(new KeySelector<String, Long>() { + @Override + public Long getKey(String value) throws Exception { + return 1L; + } + }).sortGroup(new KeySelector<String, Double>() { + @Override + public Double getKey(String value) throws Exception { + return 1.0; + } + }, Order.DESCENDING).first(1); + + b.writeAsText(resultPath); + ee.execute(); + + expected = "a\nb"; + + } public static class OneMapper implements MapFunction<Tuple3<Integer, Long, String>, Tuple1<Integer>> { private static final long serialVersionUID = 1L;
