http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb978b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java index 7e88838..dcea16d 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java @@ -30,7 +30,9 @@ import org.apache.flink.api.common.operators.DualInputSemanticProperties; import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.common.operators.SingleInputSemanticProperties; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.functions.FunctionAnnotation; import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst; @@ -41,7 +43,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.api.java.tuple.Tuple8; -import org.apache.flink.api.java.typeutils.TypeInfoParser; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; import org.apache.flink.util.Collector; import org.junit.Assert; @@ -60,6 +62,10 @@ import static org.junit.Assert.assertEquals; @SuppressWarnings("serial") public class UdfAnalyzerTest { + private static final TypeInformation<Tuple2<String, Integer>> STRING_INT_TUPLE2_TYPE_INFO = TypeInformation.of(new TypeHint<Tuple2<String, Integer>>(){}); + + private static final TypeInformation<Tuple2<String, String>> STRING_STRING_TUPLE2_TYPE_INFO = TypeInformation.of(new TypeHint<Tuple2<String, String>>(){}); + @ForwardedFields("f0->*") private static class Map1 implements MapFunction<Tuple2<String, Integer>, String> { public String map(Tuple2<String, Integer> value) throws Exception { @@ -69,8 +75,8 @@ public class UdfAnalyzerTest { @Test public void testSingleFieldExtract() { - compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map1.class, "Tuple2<String,Integer>", - "String"); + compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map1.class, + STRING_INT_TUPLE2_TYPE_INFO, Types.STRING); } @ForwardedFields("f0->f0;f0->f1") @@ -82,8 +88,8 @@ public class UdfAnalyzerTest { @Test public void testForwardIntoTuple() { - compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map2.class, "Tuple2<String,Integer>", - "Tuple2<String,String>"); + compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map2.class, + STRING_INT_TUPLE2_TYPE_INFO, STRING_STRING_TUPLE2_TYPE_INFO); } private static class Map3 implements MapFunction<String[], Integer> { @@ -96,7 +102,8 @@ public class UdfAnalyzerTest { @Test public void testForwardWithArrayAttrAccess() { - compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map3.class, "String[]", "Integer"); + compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map3.class, + TypeInformation.of(new TypeHint<String[]>(){}), Types.INT); } private static class Map4 implements MapFunction<MyPojo, String> { @@ -109,7 +116,7 @@ public class UdfAnalyzerTest { @Test public void testForwardWithGenericTypePublicAttrAccess() { compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map4.class, - "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo", "String"); + new GenericTypeInfo<>(MyPojo.class), Types.STRING); } @ForwardedFields("field2->*") @@ -123,7 +130,7 @@ public class UdfAnalyzerTest { @Test public void testForwardWithPojoPublicAttrAccess() { compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map5.class, - "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>", "String"); + TypeInformation.of(new TypeHint<MyPojo>(){}), Types.STRING); } @ForwardedFields("field->*") @@ -137,7 +144,7 @@ public class UdfAnalyzerTest { @Test public void testForwardWithPojoPrivateAttrAccess() { compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map6.class, - "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>", "String"); + TypeInformation.of(new TypeHint<MyPojo>(){}), Types.STRING); } @ForwardedFields("f0->f1") @@ -153,8 +160,8 @@ public class UdfAnalyzerTest { @Test public void testForwardIntoTupleWithCondition() { - compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map7.class, "Tuple2<String,Integer>", - "Tuple2<String,String>"); + compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map7.class, + STRING_INT_TUPLE2_TYPE_INFO, STRING_STRING_TUPLE2_TYPE_INFO); } private static class Map8 implements MapFunction<Tuple2<String, String>, String> { @@ -169,8 +176,8 @@ public class UdfAnalyzerTest { @Test public void testSingleFieldExtractWithCondition() { - compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map8.class, "Tuple2<String,String>", - "String"); + compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map8.class, + STRING_STRING_TUPLE2_TYPE_INFO, Types.STRING); } @ForwardedFields("*->f0") @@ -185,7 +192,8 @@ public class UdfAnalyzerTest { @Test public void testForwardIntoTupleWithInstanceVar() { - compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map9.class, "String", "Tuple1<String>"); + compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map9.class, Types.STRING, + TypeInformation.of(new TypeHint<Tuple1<String>>(){})); } @ForwardedFields("*->f0.f0") @@ -200,8 +208,8 @@ public class UdfAnalyzerTest { @Test public void testForwardIntoTupleWithInstanceVar2() { - compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map10.class, "String", - "Tuple1<Tuple1<String>>"); + compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map10.class, Types.STRING, + TypeInformation.of(new TypeHint<Tuple1<Tuple1<String>>>(){})); } @ForwardedFields("*->f1") @@ -222,8 +230,8 @@ public class UdfAnalyzerTest { @Test public void testForwardIntoTupleWithInstanceVarChangedByOtherMethod() { - compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map11.class, "String", - "Tuple2<String, String>"); + compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map11.class, Types.STRING, + STRING_STRING_TUPLE2_TYPE_INFO); } @ForwardedFields("f0->f0.f0;f0->f1.f0") @@ -236,8 +244,9 @@ public class UdfAnalyzerTest { @Test public void testForwardIntoNestedTuple() { - compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map12.class, "Tuple2<String,Integer>", - "Tuple2<Tuple1<String>,Tuple1<String>>"); + compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map12.class, + STRING_INT_TUPLE2_TYPE_INFO, + TypeInformation.of(new TypeHint<Tuple2<Tuple1<String>, Tuple1<String>>>(){})); } @ForwardedFields("f0->f1.f0") @@ -253,8 +262,9 @@ public class UdfAnalyzerTest { @Test public void testForwardIntoNestedTupleWithVarAndModification() { - compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map13.class, "Tuple2<String,Integer>", - "Tuple2<Tuple1<String>,Tuple1<String>>"); + compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map13.class, + STRING_INT_TUPLE2_TYPE_INFO, + TypeInformation.of(new TypeHint<Tuple2<Tuple1<String>, Tuple1<String>>>(){})); } @ForwardedFields("f0") @@ -268,8 +278,8 @@ public class UdfAnalyzerTest { @Test public void testForwardIntoTupleWithAssignment() { - compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map14.class, "Tuple2<String,Integer>", - "Tuple2<String,String>"); + compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map14.class, + STRING_INT_TUPLE2_TYPE_INFO, STRING_STRING_TUPLE2_TYPE_INFO); } @ForwardedFields("f0.f0->f0") @@ -284,7 +294,8 @@ public class UdfAnalyzerTest { @Test public void testForwardIntoTupleWithInputPath() { compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map15.class, - "Tuple2<Tuple1<String>,Integer>", "Tuple2<String,String>"); + TypeInformation.of(new TypeHint<Tuple2<Tuple1<String>, Integer>>(){}), + STRING_STRING_TUPLE2_TYPE_INFO); } @ForwardedFields("field->field2;field2->field") @@ -300,8 +311,7 @@ public class UdfAnalyzerTest { @Test public void testForwardIntoPojoByGettersAndSetters() { compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map16.class, - "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>", - "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>"); + TypeInformation.of(new TypeHint<MyPojo>(){}), TypeInformation.of(new TypeHint<MyPojo>(){})); } private static class Map17 implements MapFunction<String, Tuple1<String>> { @@ -319,7 +329,8 @@ public class UdfAnalyzerTest { @Test public void testForwardIntoTupleWithInstanceVarAndCondition() { - compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map17.class, "String", "Tuple1<String>"); + compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map17.class, Types.STRING, + TypeInformation.of(new TypeHint<Tuple1<String>>(){})); } private static class Map18 implements MapFunction<Tuple1<String>, ArrayList<String>> { @@ -333,8 +344,8 @@ public class UdfAnalyzerTest { @Test public void testForwardIntoUnsupportedObject() { - compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map18.class, "Tuple1<String>", - "java.util.ArrayList"); + compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map18.class, + TypeInformation.of(new TypeHint<Tuple1<String>>(){}), TypeInformation.of(new TypeHint<java.util.ArrayList>(){})); } @ForwardedFields("*->f0") @@ -351,7 +362,8 @@ public class UdfAnalyzerTest { @Test public void testForwardWithNewTupleToNewTupleAssignment() { - compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map19.class, "Integer", "Tuple1<Integer>"); + compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map19.class, Types.INT, + TypeInformation.of(new TypeHint<Tuple1<Integer>>(){})); } @ForwardedFields("f0;f1") @@ -371,7 +383,8 @@ public class UdfAnalyzerTest { @Test public void testForwardWithGetMethod() { compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map20.class, - "Tuple4<Integer, Integer, Integer, Integer>", "Tuple4<Integer, Integer, Integer, Integer>"); + TypeInformation.of(new TypeHint<Tuple4<Integer, Integer, Integer, Integer>>(){}), + TypeInformation.of(new TypeHint<Tuple4<Integer, Integer, Integer, Integer>>(){})); } @ForwardedFields("f0->f1;f1->f0") @@ -387,8 +400,8 @@ public class UdfAnalyzerTest { @Test public void testForwardWithSetMethod() { - compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map21.class, "Tuple2<Integer, Integer>", - "Tuple2<Integer, Integer>"); + compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map21.class, + TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}), TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){})); } @ForwardedFields("f0->f1;f1->f0") @@ -404,8 +417,8 @@ public class UdfAnalyzerTest { @Test public void testForwardIntoNewTupleWithSetMethod() { - compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map22.class, "Tuple2<Integer, Integer>", - "Tuple2<Integer, Integer>"); + compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map22.class, + TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}), TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){})); } @ForwardedFields("*") @@ -426,8 +439,8 @@ public class UdfAnalyzerTest { @Test public void testForwardWithGetMethod2() { - compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map23.class, "Tuple1<Integer>", - "Tuple1<Integer>"); + compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map23.class, + TypeInformation.of(new TypeHint<Tuple1<Integer>>(){}), TypeInformation.of(new TypeHint<Tuple1<Integer>>(){})); } private static class Map24 implements MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> { @@ -442,8 +455,8 @@ public class UdfAnalyzerTest { @Test public void testForwardWithSetMethod2() { - compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map24.class, "Tuple2<Integer, Integer>", - "Tuple2<Integer, Integer>"); + compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map24.class, + TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}), TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){})); } @ForwardedFields("f1->f0;f1") @@ -457,8 +470,8 @@ public class UdfAnalyzerTest { @Test public void testForwardWithModifiedInput() { - compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map25.class, "Tuple2<Integer, Integer>", - "Tuple2<Integer, Integer>"); + compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map25.class, + TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}), TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){})); } @ForwardedFields("*->1") @@ -482,8 +495,8 @@ public class UdfAnalyzerTest { @Test public void testForwardWithTuplesGetSetFieldMethods() { - compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map26.class, "Integer", - "Tuple2<Integer, Integer>"); + compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map26.class, Types.INT, + TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){})); } @ForwardedFields("2->3;3->7") @@ -515,8 +528,8 @@ public class UdfAnalyzerTest { @Test public void testForwardWithTuplesGetSetFieldMethods2() { compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map27.class, - "Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>", - "Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>"); + TypeInformation.of(new TypeHint<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>>(){}), + TypeInformation.of(new TypeHint<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>>(){})); } private static class Map28 implements MapFunction<Integer, Integer> { @@ -531,7 +544,7 @@ public class UdfAnalyzerTest { @Test public void testForwardWithBranching1() { - compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map28.class, "Integer", "Integer"); + compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map28.class, Types.INT, Types.INT); } @ForwardedFields("0") @@ -555,7 +568,8 @@ public class UdfAnalyzerTest { @Test public void testForwardWithBranching2() { compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map29.class, - "Tuple3<String, String, String>", "Tuple3<String, String, String>"); + TypeInformation.of(new TypeHint<Tuple3<String, String, String>>(){}), + TypeInformation.of(new TypeHint<Tuple3<String, String, String>>(){})); } private static class Map30 implements MapFunction<Tuple2<String, String>, String> { @@ -573,8 +587,8 @@ public class UdfAnalyzerTest { @Test public void testForwardWithBranching3() { - compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map30.class, "Tuple2<String,String>", - "String"); + compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map30.class, + STRING_STRING_TUPLE2_TYPE_INFO, Types.STRING); } @ForwardedFields("1->1;1->0") @@ -591,8 +605,8 @@ public class UdfAnalyzerTest { @Test public void testForwardWithInheritance() { - compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map31.class, "Tuple2<String,String>", - "Tuple2<String,String>"); + compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map31.class, + STRING_STRING_TUPLE2_TYPE_INFO, STRING_STRING_TUPLE2_TYPE_INFO); } @ForwardedFields("*") @@ -620,8 +634,8 @@ public class UdfAnalyzerTest { @Test public void testForwardWithUnboxingAndBoxing() { compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map32.class, - "Tuple8<Boolean, Character, Byte, Short, Integer, Long, Float, Double>", - "Tuple8<Boolean, Character, Byte, Short, Integer, Long, Float, Double>"); + TypeInformation.of(new TypeHint<Tuple8<Boolean, Character, Byte, Short, Integer, Long, Float, Double>>(){}), + TypeInformation.of(new TypeHint<Tuple8<Boolean, Character, Byte, Short, Integer, Long, Float, Double>>(){})); } private static class Map33 implements MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { @@ -640,8 +654,8 @@ public class UdfAnalyzerTest { @Test public void testForwardWithBranching4() { - compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map33.class, "Tuple2<Long, Long>", - "Tuple2<Long, Long>"); + compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map33.class, TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), + TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){})); } @ForwardedFields("1") @@ -663,8 +677,8 @@ public class UdfAnalyzerTest { @Test public void testForwardWithBranching5() { - compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map34.class, "Tuple2<Long, Long>", - "Tuple2<Long, Long>"); + compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map34.class, TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), + TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){})); } private static class Map35 implements MapFunction<String[], Tuple2<String[], String[]>> { @@ -678,8 +692,8 @@ public class UdfAnalyzerTest { @Test public void testForwardWithArrayModification() { - compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map35.class, "String[]", - "Tuple2<String[], String[]>"); + compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map35.class, TypeInformation.of(new TypeHint<String[]>(){}), + TypeInformation.of(new TypeHint<Tuple2<String[], String[]>>(){})); } private static class Map36 implements MapFunction<Tuple3<String, String, String>, Tuple3<String, String, String>> { @@ -696,8 +710,8 @@ public class UdfAnalyzerTest { @Test public void testForwardWithBranching6() { - compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map36.class, "Tuple3<String, String, String>", - "Tuple3<String, String, String>"); + compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map36.class, TypeInformation.of(new TypeHint<Tuple3<String, String, String>>(){}), + TypeInformation.of(new TypeHint<Tuple3<String, String, String>>(){})); } private static class Map37 implements MapFunction<Tuple1<Tuple1<String>>, Tuple1<Tuple1<String>>> { @@ -711,8 +725,8 @@ public class UdfAnalyzerTest { @Test public void testForwardWithGetAndModification() { - compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map37.class, "Tuple1<Tuple1<String>>", - "Tuple1<Tuple1<String>>"); + compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map37.class, TypeInformation.of(new TypeHint<Tuple1<Tuple1<String>>>(){}), + TypeInformation.of(new TypeHint<Tuple1<Tuple1<String>>>(){})); } @ForwardedFields("field") @@ -727,8 +741,8 @@ public class UdfAnalyzerTest { @Test public void testForwardWithInheritance2() { compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map38.class, - "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo2<field=String,field2=String>", - "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo2<field=String,field2=String>"); + TypeInformation.of(new TypeHint<MyPojo2>(){}), + TypeInformation.of(new TypeHint<MyPojo2>(){})); } private static class Map39 implements MapFunction<MyPojo, MyPojo> { @@ -743,8 +757,8 @@ public class UdfAnalyzerTest { @Test public void testForwardWithGenericTypeOutput() { compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map39.class, - "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>", - "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo"); + TypeInformation.of(new TypeHint<GenericTypeInfo<MyPojo>>(){}), + TypeInformation.of(new TypeHint<GenericTypeInfo<MyPojo>>(){})); } @ForwardedFields("field2") @@ -766,8 +780,8 @@ public class UdfAnalyzerTest { @Test public void testForwardWithRecursion() { compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map40.class, - "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>", - "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>"); + TypeInformation.of(new TypeHint<MyPojo>(){}), + TypeInformation.of(new TypeHint<MyPojo>(){})); } @ForwardedFields("field;field2") @@ -784,8 +798,8 @@ public class UdfAnalyzerTest { @Test public void testForwardWithGetRuntimeContext() { compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map41.class, - "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>", - "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>"); + TypeInformation.of(new TypeHint<MyPojo>(){}), + TypeInformation.of(new TypeHint<MyPojo>(){})); } @ForwardedFields("*") @@ -798,8 +812,8 @@ public class UdfAnalyzerTest { @Test public void testForwardWithCollector() { - compareAnalyzerResultWithAnnotationsSingleInput(FlatMapFunction.class, FlatMap1.class, "Tuple1<Integer>", - "Tuple1<Integer>"); + compareAnalyzerResultWithAnnotationsSingleInput(FlatMapFunction.class, FlatMap1.class, TypeInformation.of(new TypeHint<Tuple1<Integer>>(){}), + TypeInformation.of(new TypeHint<Tuple1<Integer>>(){})); } @ForwardedFields("0->1;1->0") @@ -817,8 +831,8 @@ public class UdfAnalyzerTest { @Test public void testForwardWith2Collectors() { - compareAnalyzerResultWithAnnotationsSingleInput(FlatMapFunction.class, FlatMap2.class, "Tuple2<Long, Long>", - "Tuple2<Long, Long>"); + compareAnalyzerResultWithAnnotationsSingleInput(FlatMapFunction.class, FlatMap2.class, TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), + TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){})); } private static class FlatMap3 implements FlatMapFunction<Tuple1<Integer>, Tuple1<Integer>> { @@ -835,8 +849,8 @@ public class UdfAnalyzerTest { @Test public void testForwardWithCollectorPassing() { - compareAnalyzerResultWithAnnotationsSingleInput(FlatMapFunction.class, FlatMap3.class, "Tuple1<Integer>", - "Tuple1<Integer>"); + compareAnalyzerResultWithAnnotationsSingleInput(FlatMapFunction.class, FlatMap3.class, TypeInformation.of(new TypeHint<Tuple1<Integer>>(){}), + TypeInformation.of(new TypeHint<Tuple1<Integer>>(){})); } @ForwardedFieldsFirst("f1->f1") @@ -850,8 +864,8 @@ public class UdfAnalyzerTest { @Test public void testForwardWithDualInput() { - compareAnalyzerResultWithAnnotationsDualInput(JoinFunction.class, Join1.class, "Tuple2<Long, Long>", - "Tuple2<Long, Long>", "Tuple2<Long, Long>"); + compareAnalyzerResultWithAnnotationsDualInput(JoinFunction.class, Join1.class, TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), + TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){})); } @ForwardedFieldsFirst("*") @@ -866,8 +880,8 @@ public class UdfAnalyzerTest { @Test public void testForwardWithDualInputAndCollector() { - compareAnalyzerResultWithAnnotationsDualInput(FlatJoinFunction.class, Join2.class, "Tuple2<Long, Long>", - "Tuple2<Long, Long>", "Tuple2<Long, Long>"); + compareAnalyzerResultWithAnnotationsDualInput(FlatJoinFunction.class, Join2.class, TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), + TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){})); } @ForwardedFields("0") @@ -881,7 +895,7 @@ public class UdfAnalyzerTest { @Test public void testForwardWithIterable() { compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce1.class, - "Tuple2<Long, Long>", "Tuple2<Long, Long>", new String[] { "0" }); + TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), new String[] { "0" }); } @ForwardedFields("1->0") @@ -907,7 +921,7 @@ public class UdfAnalyzerTest { @Test public void testForwardWithIterable2() { compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce2.class, - "Tuple2<Long, Long>", "Tuple2<Long, Long>", new String[] { "0", "1" }); + TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), new String[] { "0", "1" }); } @ForwardedFields("field2") @@ -923,8 +937,8 @@ public class UdfAnalyzerTest { @Test public void testForwardWithIterable3() { compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce3.class, - "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>", - "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>", new String[] { "field2" }); + TypeInformation.of(new TypeHint<MyPojo>(){}), + TypeInformation.of(new TypeHint<MyPojo>(){}), new String[] { "field2" }); } @ForwardedFields("f0->*") @@ -942,7 +956,7 @@ public class UdfAnalyzerTest { @Test public void testForwardWithAtLeastOneIterationAssumption() { compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce4.class, - "Tuple2<Long, Long>", "Long", new String[] { "f0" }); + TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), Types.LONG, new String[] { "f0" }); } @ForwardedFields("f0->*") @@ -967,7 +981,7 @@ public class UdfAnalyzerTest { public void testForwardWithAtLeastOneIterationAssumptionForJavac() { // this test simulates javac behaviour in Eclipse IDE compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce4Javac.class, - "Tuple2<Long, Long>", "Long", new String[] { "f0" }); + TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), Types.LONG, new String[] { "f0" }); } private static class GroupReduce5 implements GroupReduceFunction<Tuple2<Long, Long>, Long> { @@ -987,7 +1001,7 @@ public class UdfAnalyzerTest { @Test public void testForwardWithAtLeastOneIterationAssumption2() { compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce5.class, - "Tuple2<Long, Long>", "Long", new String[] { "f1" }); + TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), Types.LONG, new String[] { "f1" }); } private static class GroupReduce6 implements GroupReduceFunction<Tuple2<Long, Long>, Long> { @@ -1005,7 +1019,7 @@ public class UdfAnalyzerTest { @Test public void testForwardWithAtLeastOneIterationAssumption3() { compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce6.class, - "Tuple2<Long, Long>", "Long", new String[] { "f0" }); + TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), Types.LONG, new String[] { "f0" }); } private static class GroupReduce7 implements GroupReduceFunction<Tuple2<Long, Long>, Long> { @@ -1023,7 +1037,7 @@ public class UdfAnalyzerTest { @Test public void testForwardWithAtLeastOneIterationAssumption4() { compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce7.class, - "Tuple2<Long, Long>", "Long", new String[] { "f0" }); + TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), Types.LONG, new String[] { "f0" }); } @ForwardedFields("f0->*") @@ -1042,7 +1056,7 @@ public class UdfAnalyzerTest { @Test public void testForwardWithAtLeastOneIterationAssumption5() { compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce8.class, - "Tuple2<Long, Long>", "Long", new String[] { "f0" }); + TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), Types.LONG, new String[] { "f0" }); } @ForwardedFields("f0") @@ -1061,7 +1075,7 @@ public class UdfAnalyzerTest { @Test public void testForwardWithAtLeastOneIterationAssumption6() { compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce9.class, - "Tuple2<Long, Long>", "Tuple2<Long, Long>", new String[] { "f0" }); + TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), new String[] { "f0" }); } private static class GroupReduce10 implements GroupReduceFunction<Tuple2<Long, Long>, Boolean> { @@ -1082,7 +1096,7 @@ public class UdfAnalyzerTest { @Test public void testForwardWithAtLeastOneIterationAssumption7() { compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce10.class, - "Tuple2<Long, Long>", "Boolean", new String[] { "f0" }); + TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), Types.BOOLEAN, new String[] { "f0" }); } @ForwardedFields("field") @@ -1096,9 +1110,9 @@ public class UdfAnalyzerTest { @Test public void testForwardWithReduce() { compareAnalyzerResultWithAnnotationsSingleInputWithKeys(ReduceFunction.class, Reduce1.class, - "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>", - "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>", - new String[] { "field" }); + TypeInformation.of(new TypeHint<MyPojo>(){}), + TypeInformation.of(new TypeHint<MyPojo>(){}), + new String[] { "field" }); } @ForwardedFields("field") @@ -1115,9 +1129,9 @@ public class UdfAnalyzerTest { @Test public void testForwardWithBranchingReduce() { compareAnalyzerResultWithAnnotationsSingleInputWithKeys(ReduceFunction.class, Reduce2.class, - "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>", - "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>", - new String[] { "field" }); + TypeInformation.of(new TypeHint<MyPojo>(){}), + TypeInformation.of(new TypeHint<MyPojo>(){}), + new String[] { "field" }); } private static class NullReturnMapper1 implements MapFunction<String, String> { @@ -1215,7 +1229,7 @@ public class UdfAnalyzerTest { public void testFilterModificationException1() { try { final UdfAnalyzer ua = new UdfAnalyzer(FilterFunction.class, FilterMod1.class, "operator", - TypeInfoParser.parse("Tuple2<String, String>"), null, null, null, null, true); + STRING_STRING_TUPLE2_TYPE_INFO, null, null, null, null, true); ua.analyze(); Assert.fail(); } @@ -1237,7 +1251,7 @@ public class UdfAnalyzerTest { public void testFilterModificationException2() { try { final UdfAnalyzer ua = new UdfAnalyzer(FilterFunction.class, FilterMod2.class, "operator", - TypeInfoParser.parse("Tuple2<String, String>"), null, null, null, null, true); + STRING_STRING_TUPLE2_TYPE_INFO, null, null, null, null, true); ua.analyze(); Assert.fail(); } @@ -1303,17 +1317,14 @@ public class UdfAnalyzerTest { } } - public static void compareAnalyzerResultWithAnnotationsSingleInput(Class<?> baseClass, Class<?> clazz, String in, - String out) { - compareAnalyzerResultWithAnnotationsSingleInputWithKeys(baseClass, clazz, in, out, null); + public static void compareAnalyzerResultWithAnnotationsSingleInput(Class<?> baseClass, Class<?> clazz, + TypeInformation<?> inType, TypeInformation<?> outType) { + compareAnalyzerResultWithAnnotationsSingleInputWithKeys(baseClass, clazz, inType, outType, null); } @SuppressWarnings({ "rawtypes", "unchecked" }) public static void compareAnalyzerResultWithAnnotationsSingleInputWithKeys(Class<?> baseClass, Class<?> clazz, - String in, String out, String[] keys) { - final TypeInformation<?> inType = TypeInfoParser.parse(in); - final TypeInformation<?> outType = TypeInfoParser.parse(out); - + TypeInformation<?> inType, TypeInformation<?> outType, String[] keys) { // expected final Set<Annotation> annotations = FunctionAnnotation.readSingleForwardAnnotations(clazz); SingleInputSemanticProperties expected = SemanticPropUtil.getSemanticPropsSingle(annotations, inType, @@ -1331,18 +1342,14 @@ public class UdfAnalyzerTest { assertEquals(expected.toString(), actual.toString()); } - public static void compareAnalyzerResultWithAnnotationsDualInput(Class<?> baseClass, Class<?> clazz, String in1, - String in2, String out) { - compareAnalyzerResultWithAnnotationsDualInputWithKeys(baseClass, clazz, in1, in2, out, null, null); + public static void compareAnalyzerResultWithAnnotationsDualInput(Class<?> baseClass, Class<?> clazz, + TypeInformation<?> in1Type, TypeInformation<?> in2Type, TypeInformation<?> outType) { + compareAnalyzerResultWithAnnotationsDualInputWithKeys(baseClass, clazz, in1Type, in2Type, outType, null, null); } @SuppressWarnings({ "rawtypes", "unchecked" }) public static void compareAnalyzerResultWithAnnotationsDualInputWithKeys(Class<?> baseClass, Class<?> clazz, - String in1, String in2, String out, String[] keys1, String[] keys2) { - final TypeInformation<?> in1Type = TypeInfoParser.parse(in1); - final TypeInformation<?> in2Type = TypeInfoParser.parse(in2); - final TypeInformation<?> outType = TypeInfoParser.parse(out); - + TypeInformation<?> in1Type, TypeInformation<?> in2Type, TypeInformation<?> outType, String[] keys1, String[] keys2) { // expected final Set<Annotation> annotations = FunctionAnnotation.readDualForwardAnnotations(clazz); final DualInputSemanticProperties expected = SemanticPropUtil.getSemanticPropsDual(annotations, in1Type,
http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb978b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java index a31ce2e..06f486d 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java @@ -18,8 +18,7 @@ package org.apache.flink.graph.asm.dataset; -import org.apache.flink.api.common.typeinfo.TypeHint; -import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum; @@ -62,7 +61,7 @@ public class ChecksumHashCodeTest { @Test public void testEmptyList() throws Exception { - DataSet<Long> dataset = env.fromCollection(Collections.emptyList(), TypeInformation.of(new TypeHint<Long>(){})); + DataSet<Long> dataset = env.fromCollection(Collections.emptyList(), Types.LONG); Checksum checksum = new ChecksumHashCode<Long>().run(dataset).execute(); http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb978b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java index cfeadce..027b809 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java @@ -18,8 +18,7 @@ package org.apache.flink.graph.asm.dataset; -import org.apache.flink.api.common.typeinfo.TypeHint; -import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; @@ -61,7 +60,7 @@ public class CollectTest { @Test public void testEmptyList() throws Exception { - DataSet<Long> dataset = env.fromCollection(Collections.emptyList(), TypeInformation.of(new TypeHint<Long>(){})); + DataSet<Long> dataset = env.fromCollection(Collections.emptyList(), Types.LONG); List<Long> collected = new Collect<Long>().run(dataset).execute(); http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb978b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java index 0167a5f..dc92c55 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java @@ -18,8 +18,7 @@ package org.apache.flink.graph.asm.dataset; -import org.apache.flink.api.common.typeinfo.TypeHint; -import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; @@ -60,7 +59,7 @@ public class CountTest { @Test public void testEmptyList() throws Exception { - DataSet<Long> dataset = env.fromCollection(Collections.emptyList(), TypeInformation.of(new TypeHint<Long>(){})); + DataSet<Long> dataset = env.fromCollection(Collections.emptyList(), Types.LONG); long count = new Count<Long>().run(dataset).execute();