http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb978b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java 
b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java
index 7e88838..dcea16d 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java
@@ -30,7 +30,9 @@ import 
org.apache.flink.api.common.operators.DualInputSemanticProperties;
 import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.functions.FunctionAnnotation;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
@@ -41,7 +43,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.api.java.tuple.Tuple8;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.util.Collector;
 
 import org.junit.Assert;
@@ -60,6 +62,10 @@ import static org.junit.Assert.assertEquals;
 @SuppressWarnings("serial")
 public class UdfAnalyzerTest {
 
+       private static final TypeInformation<Tuple2<String, Integer>> 
STRING_INT_TUPLE2_TYPE_INFO = TypeInformation.of(new TypeHint<Tuple2<String, 
Integer>>(){});
+
+       private static final TypeInformation<Tuple2<String, String>> 
STRING_STRING_TUPLE2_TYPE_INFO = TypeInformation.of(new TypeHint<Tuple2<String, 
String>>(){});
+
        @ForwardedFields("f0->*")
        private static class Map1 implements MapFunction<Tuple2<String, 
Integer>, String> {
                public String map(Tuple2<String, Integer> value) throws 
Exception {
@@ -69,8 +75,8 @@ public class UdfAnalyzerTest {
 
        @Test
        public void testSingleFieldExtract() {
-               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map1.class, 
"Tuple2<String,Integer>",
-                               "String");
+               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map1.class,
+                       STRING_INT_TUPLE2_TYPE_INFO, Types.STRING);
        }
 
        @ForwardedFields("f0->f0;f0->f1")
@@ -82,8 +88,8 @@ public class UdfAnalyzerTest {
 
        @Test
        public void testForwardIntoTuple() {
-               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map2.class, 
"Tuple2<String,Integer>",
-                               "Tuple2<String,String>");
+               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map2.class,
+                       STRING_INT_TUPLE2_TYPE_INFO, 
STRING_STRING_TUPLE2_TYPE_INFO);
        }
 
        private static class Map3 implements MapFunction<String[], Integer> {
@@ -96,7 +102,8 @@ public class UdfAnalyzerTest {
 
        @Test
        public void testForwardWithArrayAttrAccess() {
-               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map3.class, 
"String[]", "Integer");
+               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map3.class,
+                       TypeInformation.of(new TypeHint<String[]>(){}), 
Types.INT);
        }
 
        private static class Map4 implements MapFunction<MyPojo, String> {
@@ -109,7 +116,7 @@ public class UdfAnalyzerTest {
        @Test
        public void testForwardWithGenericTypePublicAttrAccess() {
                
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map4.class,
-                               
"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo", "String");
+                       new GenericTypeInfo<>(MyPojo.class), Types.STRING);
        }
 
        @ForwardedFields("field2->*")
@@ -123,7 +130,7 @@ public class UdfAnalyzerTest {
        @Test
        public void testForwardWithPojoPublicAttrAccess() {
                
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map5.class,
-                               
"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
 "String");
+                       TypeInformation.of(new TypeHint<MyPojo>(){}), 
Types.STRING);
        }
 
        @ForwardedFields("field->*")
@@ -137,7 +144,7 @@ public class UdfAnalyzerTest {
        @Test
        public void testForwardWithPojoPrivateAttrAccess() {
                
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map6.class,
-                               
"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
 "String");
+                       TypeInformation.of(new TypeHint<MyPojo>(){}), 
Types.STRING);
        }
 
        @ForwardedFields("f0->f1")
@@ -153,8 +160,8 @@ public class UdfAnalyzerTest {
 
        @Test
        public void testForwardIntoTupleWithCondition() {
-               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map7.class, 
"Tuple2<String,Integer>",
-                               "Tuple2<String,String>");
+               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map7.class,
+                       STRING_INT_TUPLE2_TYPE_INFO, 
STRING_STRING_TUPLE2_TYPE_INFO);
        }
 
        private static class Map8 implements MapFunction<Tuple2<String, 
String>, String> {
@@ -169,8 +176,8 @@ public class UdfAnalyzerTest {
 
        @Test
        public void testSingleFieldExtractWithCondition() {
-               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map8.class, 
"Tuple2<String,String>",
-                               "String");
+               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map8.class,
+                       STRING_STRING_TUPLE2_TYPE_INFO, Types.STRING);
        }
 
        @ForwardedFields("*->f0")
@@ -185,7 +192,8 @@ public class UdfAnalyzerTest {
 
        @Test
        public void testForwardIntoTupleWithInstanceVar() {
-               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map9.class, 
"String", "Tuple1<String>");
+               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map9.class, 
Types.STRING,
+                       TypeInformation.of(new TypeHint<Tuple1<String>>(){}));
        }
 
        @ForwardedFields("*->f0.f0")
@@ -200,8 +208,8 @@ public class UdfAnalyzerTest {
 
        @Test
        public void testForwardIntoTupleWithInstanceVar2() {
-               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map10.class, 
"String",
-                               "Tuple1<Tuple1<String>>");
+               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map10.class, 
Types.STRING,
+                       TypeInformation.of(new 
TypeHint<Tuple1<Tuple1<String>>>(){}));
        }
 
        @ForwardedFields("*->f1")
@@ -222,8 +230,8 @@ public class UdfAnalyzerTest {
 
        @Test
        public void testForwardIntoTupleWithInstanceVarChangedByOtherMethod() {
-               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map11.class, 
"String",
-                               "Tuple2<String, String>");
+               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map11.class, 
Types.STRING,
+                       STRING_STRING_TUPLE2_TYPE_INFO);
        }
 
        @ForwardedFields("f0->f0.f0;f0->f1.f0")
@@ -236,8 +244,9 @@ public class UdfAnalyzerTest {
 
        @Test
        public void testForwardIntoNestedTuple() {
-               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map12.class, 
"Tuple2<String,Integer>",
-                               "Tuple2<Tuple1<String>,Tuple1<String>>");
+               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map12.class,
+                       STRING_INT_TUPLE2_TYPE_INFO,
+                       TypeInformation.of(new TypeHint<Tuple2<Tuple1<String>, 
Tuple1<String>>>(){}));
        }
 
        @ForwardedFields("f0->f1.f0")
@@ -253,8 +262,9 @@ public class UdfAnalyzerTest {
 
        @Test
        public void testForwardIntoNestedTupleWithVarAndModification() {
-               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map13.class, 
"Tuple2<String,Integer>",
-                               "Tuple2<Tuple1<String>,Tuple1<String>>");
+               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map13.class,
+                       STRING_INT_TUPLE2_TYPE_INFO,
+                       TypeInformation.of(new TypeHint<Tuple2<Tuple1<String>, 
Tuple1<String>>>(){}));
        }
 
        @ForwardedFields("f0")
@@ -268,8 +278,8 @@ public class UdfAnalyzerTest {
 
        @Test
        public void testForwardIntoTupleWithAssignment() {
-               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map14.class, 
"Tuple2<String,Integer>",
-                               "Tuple2<String,String>");
+               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map14.class,
+                       STRING_INT_TUPLE2_TYPE_INFO, 
STRING_STRING_TUPLE2_TYPE_INFO);
        }
 
        @ForwardedFields("f0.f0->f0")
@@ -284,7 +294,8 @@ public class UdfAnalyzerTest {
        @Test
        public void testForwardIntoTupleWithInputPath() {
                
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map15.class,
-                               "Tuple2<Tuple1<String>,Integer>", 
"Tuple2<String,String>");
+                       TypeInformation.of(new TypeHint<Tuple2<Tuple1<String>, 
Integer>>(){}),
+                       STRING_STRING_TUPLE2_TYPE_INFO);
        }
 
        @ForwardedFields("field->field2;field2->field")
@@ -300,8 +311,7 @@ public class UdfAnalyzerTest {
        @Test
        public void testForwardIntoPojoByGettersAndSetters() {
                
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map16.class,
-                               
"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
-                               
"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>");
+                       TypeInformation.of(new TypeHint<MyPojo>(){}), 
TypeInformation.of(new TypeHint<MyPojo>(){}));
        }
 
        private static class Map17 implements MapFunction<String, 
Tuple1<String>> {
@@ -319,7 +329,8 @@ public class UdfAnalyzerTest {
 
        @Test
        public void testForwardIntoTupleWithInstanceVarAndCondition() {
-               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map17.class, 
"String", "Tuple1<String>");
+               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map17.class, 
Types.STRING,
+                       TypeInformation.of(new TypeHint<Tuple1<String>>(){}));
        }
 
        private static class Map18 implements MapFunction<Tuple1<String>, 
ArrayList<String>> {
@@ -333,8 +344,8 @@ public class UdfAnalyzerTest {
 
        @Test
        public void testForwardIntoUnsupportedObject() {
-               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map18.class, 
"Tuple1<String>",
-                               "java.util.ArrayList");
+               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map18.class,
+                       TypeInformation.of(new TypeHint<Tuple1<String>>(){}), 
TypeInformation.of(new TypeHint<java.util.ArrayList>(){}));
        }
 
        @ForwardedFields("*->f0")
@@ -351,7 +362,8 @@ public class UdfAnalyzerTest {
 
        @Test
        public void testForwardWithNewTupleToNewTupleAssignment() {
-               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map19.class, 
"Integer", "Tuple1<Integer>");
+               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map19.class, 
Types.INT,
+                       TypeInformation.of(new TypeHint<Tuple1<Integer>>(){}));
        }
 
        @ForwardedFields("f0;f1")
@@ -371,7 +383,8 @@ public class UdfAnalyzerTest {
        @Test
        public void testForwardWithGetMethod() {
                
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map20.class,
-                               "Tuple4<Integer, Integer, Integer, Integer>", 
"Tuple4<Integer, Integer, Integer, Integer>");
+                       TypeInformation.of(new TypeHint<Tuple4<Integer, 
Integer, Integer, Integer>>(){}),
+                       TypeInformation.of(new TypeHint<Tuple4<Integer, 
Integer, Integer, Integer>>(){}));
        }
 
        @ForwardedFields("f0->f1;f1->f0")
@@ -387,8 +400,8 @@ public class UdfAnalyzerTest {
 
        @Test
        public void testForwardWithSetMethod() {
-               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map21.class, 
"Tuple2<Integer, Integer>",
-                               "Tuple2<Integer, Integer>");
+               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map21.class,
+                       TypeInformation.of(new TypeHint<Tuple2<Integer, 
Integer>>(){}), TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}));
        }
 
        @ForwardedFields("f0->f1;f1->f0")
@@ -404,8 +417,8 @@ public class UdfAnalyzerTest {
 
        @Test
        public void testForwardIntoNewTupleWithSetMethod() {
-               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map22.class, 
"Tuple2<Integer, Integer>",
-                               "Tuple2<Integer, Integer>");
+               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map22.class,
+                       TypeInformation.of(new TypeHint<Tuple2<Integer, 
Integer>>(){}), TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}));
        }
 
        @ForwardedFields("*")
@@ -426,8 +439,8 @@ public class UdfAnalyzerTest {
 
        @Test
        public void testForwardWithGetMethod2() {
-               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map23.class, 
"Tuple1<Integer>",
-                               "Tuple1<Integer>");
+               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map23.class,
+                       TypeInformation.of(new TypeHint<Tuple1<Integer>>(){}), 
TypeInformation.of(new TypeHint<Tuple1<Integer>>(){}));
        }
 
        private static class Map24 implements MapFunction<Tuple2<Integer, 
Integer>, Tuple2<Integer, Integer>> {
@@ -442,8 +455,8 @@ public class UdfAnalyzerTest {
 
        @Test
        public void testForwardWithSetMethod2() {
-               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map24.class, 
"Tuple2<Integer, Integer>",
-                               "Tuple2<Integer, Integer>");
+               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map24.class,
+                       TypeInformation.of(new TypeHint<Tuple2<Integer, 
Integer>>(){}), TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}));
        }
 
        @ForwardedFields("f1->f0;f1")
@@ -457,8 +470,8 @@ public class UdfAnalyzerTest {
 
        @Test
        public void testForwardWithModifiedInput() {
-               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map25.class, 
"Tuple2<Integer, Integer>",
-                               "Tuple2<Integer, Integer>");
+               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map25.class,
+                       TypeInformation.of(new TypeHint<Tuple2<Integer, 
Integer>>(){}), TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}));
        }
 
        @ForwardedFields("*->1")
@@ -482,8 +495,8 @@ public class UdfAnalyzerTest {
 
        @Test
        public void testForwardWithTuplesGetSetFieldMethods() {
-               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map26.class, 
"Integer",
-                               "Tuple2<Integer, Integer>");
+               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map26.class, 
Types.INT,
+                       TypeInformation.of(new TypeHint<Tuple2<Integer, 
Integer>>(){}));
        }
 
        @ForwardedFields("2->3;3->7")
@@ -515,8 +528,8 @@ public class UdfAnalyzerTest {
        @Test
        public void testForwardWithTuplesGetSetFieldMethods2() {
                
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map27.class,
-                               "Tuple8<Integer, Integer, Integer, Integer, 
Integer, Integer, Integer, Integer>",
-                               "Tuple8<Integer, Integer, Integer, Integer, 
Integer, Integer, Integer, Integer>");
+                       TypeInformation.of(new TypeHint<Tuple8<Integer, 
Integer, Integer, Integer, Integer, Integer, Integer, Integer>>(){}),
+                       TypeInformation.of(new TypeHint<Tuple8<Integer, 
Integer, Integer, Integer, Integer, Integer, Integer, Integer>>(){}));
        }
 
        private static class Map28 implements MapFunction<Integer, Integer> {
@@ -531,7 +544,7 @@ public class UdfAnalyzerTest {
 
        @Test
        public void testForwardWithBranching1() {
-               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map28.class, 
"Integer", "Integer");
+               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map28.class, 
Types.INT, Types.INT);
        }
 
        @ForwardedFields("0")
@@ -555,7 +568,8 @@ public class UdfAnalyzerTest {
        @Test
        public void testForwardWithBranching2() {
                
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map29.class,
-                               "Tuple3<String, String, String>", 
"Tuple3<String, String, String>");
+                       TypeInformation.of(new TypeHint<Tuple3<String, String, 
String>>(){}),
+                       TypeInformation.of(new TypeHint<Tuple3<String, String, 
String>>(){}));
        }
 
        private static class Map30 implements MapFunction<Tuple2<String, 
String>, String> {
@@ -573,8 +587,8 @@ public class UdfAnalyzerTest {
 
        @Test
        public void testForwardWithBranching3() {
-               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map30.class, 
"Tuple2<String,String>",
-                               "String");
+               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map30.class,
+                       STRING_STRING_TUPLE2_TYPE_INFO, Types.STRING);
        }
 
        @ForwardedFields("1->1;1->0")
@@ -591,8 +605,8 @@ public class UdfAnalyzerTest {
 
        @Test
        public void testForwardWithInheritance() {
-               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map31.class, 
"Tuple2<String,String>",
-                               "Tuple2<String,String>");
+               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map31.class,
+                       STRING_STRING_TUPLE2_TYPE_INFO, 
STRING_STRING_TUPLE2_TYPE_INFO);
        }
 
        @ForwardedFields("*")
@@ -620,8 +634,8 @@ public class UdfAnalyzerTest {
        @Test
        public void testForwardWithUnboxingAndBoxing() {
                
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map32.class,
-                               "Tuple8<Boolean, Character, Byte, Short, 
Integer, Long, Float, Double>",
-                               "Tuple8<Boolean, Character, Byte, Short, 
Integer, Long, Float, Double>");
+                       TypeInformation.of(new TypeHint<Tuple8<Boolean, 
Character, Byte, Short, Integer, Long, Float, Double>>(){}),
+                       TypeInformation.of(new TypeHint<Tuple8<Boolean, 
Character, Byte, Short, Integer, Long, Float, Double>>(){}));
        }
 
        private static class Map33 implements MapFunction<Tuple2<Long, Long>, 
Tuple2<Long, Long>> {
@@ -640,8 +654,8 @@ public class UdfAnalyzerTest {
 
        @Test
        public void testForwardWithBranching4() {
-               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map33.class, 
"Tuple2<Long, Long>",
-                               "Tuple2<Long, Long>");
+               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map33.class, 
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}),
+                       TypeInformation.of(new TypeHint<Tuple2<Long, 
Long>>(){}));
        }
 
        @ForwardedFields("1")
@@ -663,8 +677,8 @@ public class UdfAnalyzerTest {
 
        @Test
        public void testForwardWithBranching5() {
-               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map34.class, 
"Tuple2<Long, Long>",
-                               "Tuple2<Long, Long>");
+               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map34.class, 
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}),
+                       TypeInformation.of(new TypeHint<Tuple2<Long, 
Long>>(){}));
        }
 
        private static class Map35 implements MapFunction<String[], 
Tuple2<String[], String[]>> {
@@ -678,8 +692,8 @@ public class UdfAnalyzerTest {
 
        @Test
        public void testForwardWithArrayModification() {
-               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map35.class, 
"String[]",
-                               "Tuple2<String[], String[]>");
+               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map35.class, 
TypeInformation.of(new TypeHint<String[]>(){}),
+                       TypeInformation.of(new TypeHint<Tuple2<String[], 
String[]>>(){}));
        }
 
        private static class Map36 implements MapFunction<Tuple3<String, 
String, String>, Tuple3<String, String, String>> {
@@ -696,8 +710,8 @@ public class UdfAnalyzerTest {
 
        @Test
        public void testForwardWithBranching6() {
-               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map36.class, 
"Tuple3<String, String, String>",
-                               "Tuple3<String, String, String>");
+               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map36.class, 
TypeInformation.of(new TypeHint<Tuple3<String, String, String>>(){}),
+                       TypeInformation.of(new TypeHint<Tuple3<String, String, 
String>>(){}));
        }
 
        private static class Map37 implements 
MapFunction<Tuple1<Tuple1<String>>, Tuple1<Tuple1<String>>> {
@@ -711,8 +725,8 @@ public class UdfAnalyzerTest {
 
        @Test
        public void testForwardWithGetAndModification() {
-               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map37.class, 
"Tuple1<Tuple1<String>>",
-                               "Tuple1<Tuple1<String>>");
+               
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map37.class, 
TypeInformation.of(new TypeHint<Tuple1<Tuple1<String>>>(){}),
+                       TypeInformation.of(new 
TypeHint<Tuple1<Tuple1<String>>>(){}));
        }
 
        @ForwardedFields("field")
@@ -727,8 +741,8 @@ public class UdfAnalyzerTest {
        @Test
        public void testForwardWithInheritance2() {
                
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map38.class,
-                               
"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo2<field=String,field2=String>",
-                               
"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo2<field=String,field2=String>");
+                       TypeInformation.of(new TypeHint<MyPojo2>(){}),
+                       TypeInformation.of(new TypeHint<MyPojo2>(){}));
        }
 
        private static class Map39 implements MapFunction<MyPojo, MyPojo> {
@@ -743,8 +757,8 @@ public class UdfAnalyzerTest {
        @Test
        public void testForwardWithGenericTypeOutput() {
                
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map39.class,
-                               
"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
-                               
"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo");
+                       TypeInformation.of(new 
TypeHint<GenericTypeInfo<MyPojo>>(){}),
+                       TypeInformation.of(new 
TypeHint<GenericTypeInfo<MyPojo>>(){}));
        }
 
        @ForwardedFields("field2")
@@ -766,8 +780,8 @@ public class UdfAnalyzerTest {
        @Test
        public void testForwardWithRecursion() {
                
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map40.class,
-                               
"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
-                               
"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>");
+                       TypeInformation.of(new TypeHint<MyPojo>(){}),
+                       TypeInformation.of(new TypeHint<MyPojo>(){}));
        }
 
        @ForwardedFields("field;field2")
@@ -784,8 +798,8 @@ public class UdfAnalyzerTest {
        @Test
        public void testForwardWithGetRuntimeContext() {
                
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map41.class,
-                               
"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
-                               
"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>");
+                       TypeInformation.of(new TypeHint<MyPojo>(){}),
+                       TypeInformation.of(new TypeHint<MyPojo>(){}));
        }
 
        @ForwardedFields("*")
@@ -798,8 +812,8 @@ public class UdfAnalyzerTest {
 
        @Test
        public void testForwardWithCollector() {
-               
compareAnalyzerResultWithAnnotationsSingleInput(FlatMapFunction.class, 
FlatMap1.class, "Tuple1<Integer>",
-                               "Tuple1<Integer>");
+               
compareAnalyzerResultWithAnnotationsSingleInput(FlatMapFunction.class, 
FlatMap1.class, TypeInformation.of(new TypeHint<Tuple1<Integer>>(){}),
+                       TypeInformation.of(new TypeHint<Tuple1<Integer>>(){}));
        }
 
        @ForwardedFields("0->1;1->0")
@@ -817,8 +831,8 @@ public class UdfAnalyzerTest {
 
        @Test
        public void testForwardWith2Collectors() {
-               
compareAnalyzerResultWithAnnotationsSingleInput(FlatMapFunction.class, 
FlatMap2.class, "Tuple2<Long, Long>",
-                               "Tuple2<Long, Long>");
+               
compareAnalyzerResultWithAnnotationsSingleInput(FlatMapFunction.class, 
FlatMap2.class, TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}),
+                       TypeInformation.of(new TypeHint<Tuple2<Long, 
Long>>(){}));
        }
 
        private static class FlatMap3 implements 
FlatMapFunction<Tuple1<Integer>, Tuple1<Integer>> {
@@ -835,8 +849,8 @@ public class UdfAnalyzerTest {
 
        @Test
        public void testForwardWithCollectorPassing() {
-               
compareAnalyzerResultWithAnnotationsSingleInput(FlatMapFunction.class, 
FlatMap3.class, "Tuple1<Integer>",
-                               "Tuple1<Integer>");
+               
compareAnalyzerResultWithAnnotationsSingleInput(FlatMapFunction.class, 
FlatMap3.class, TypeInformation.of(new TypeHint<Tuple1<Integer>>(){}),
+                       TypeInformation.of(new TypeHint<Tuple1<Integer>>(){}));
        }
 
        @ForwardedFieldsFirst("f1->f1")
@@ -850,8 +864,8 @@ public class UdfAnalyzerTest {
 
        @Test
        public void testForwardWithDualInput() {
-               
compareAnalyzerResultWithAnnotationsDualInput(JoinFunction.class, Join1.class, 
"Tuple2<Long, Long>",
-                               "Tuple2<Long, Long>", "Tuple2<Long, Long>");
+               
compareAnalyzerResultWithAnnotationsDualInput(JoinFunction.class, Join1.class, 
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}),
+                       TypeInformation.of(new TypeHint<Tuple2<Long, 
Long>>(){}), TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}));
        }
 
        @ForwardedFieldsFirst("*")
@@ -866,8 +880,8 @@ public class UdfAnalyzerTest {
 
        @Test
        public void testForwardWithDualInputAndCollector() {
-               
compareAnalyzerResultWithAnnotationsDualInput(FlatJoinFunction.class, 
Join2.class, "Tuple2<Long, Long>",
-                               "Tuple2<Long, Long>", "Tuple2<Long, Long>");
+               
compareAnalyzerResultWithAnnotationsDualInput(FlatJoinFunction.class, 
Join2.class, TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}),
+                       TypeInformation.of(new TypeHint<Tuple2<Long, 
Long>>(){}), TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}));
        }
 
        @ForwardedFields("0")
@@ -881,7 +895,7 @@ public class UdfAnalyzerTest {
        @Test
        public void testForwardWithIterable() {
                
compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class,
 GroupReduce1.class,
-                               "Tuple2<Long, Long>", "Tuple2<Long, Long>", new 
String[] { "0" });
+                       TypeInformation.of(new TypeHint<Tuple2<Long, 
Long>>(){}), TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), new 
String[] { "0" });
        }
 
        @ForwardedFields("1->0")
@@ -907,7 +921,7 @@ public class UdfAnalyzerTest {
        @Test
        public void testForwardWithIterable2() {
                
compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class,
 GroupReduce2.class,
-                               "Tuple2<Long, Long>", "Tuple2<Long, Long>", new 
String[] { "0", "1" });
+                       TypeInformation.of(new TypeHint<Tuple2<Long, 
Long>>(){}), TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), new 
String[] { "0", "1" });
        }
 
        @ForwardedFields("field2")
@@ -923,8 +937,8 @@ public class UdfAnalyzerTest {
        @Test
        public void testForwardWithIterable3() {
                
compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class,
 GroupReduce3.class,
-                               
"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
-                               
"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
 new String[] { "field2" });
+                       TypeInformation.of(new TypeHint<MyPojo>(){}),
+                       TypeInformation.of(new TypeHint<MyPojo>(){}), new 
String[] { "field2" });
        }
 
        @ForwardedFields("f0->*")
@@ -942,7 +956,7 @@ public class UdfAnalyzerTest {
        @Test
        public void testForwardWithAtLeastOneIterationAssumption() {
                
compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class,
 GroupReduce4.class,
-                               "Tuple2<Long, Long>", "Long", new String[] { 
"f0" });
+                       TypeInformation.of(new TypeHint<Tuple2<Long, 
Long>>(){}), Types.LONG, new String[] { "f0" });
        }
 
        @ForwardedFields("f0->*")
@@ -967,7 +981,7 @@ public class UdfAnalyzerTest {
        public void testForwardWithAtLeastOneIterationAssumptionForJavac() {
                // this test simulates javac behaviour in Eclipse IDE
                
compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class,
 GroupReduce4Javac.class,
-                               "Tuple2<Long, Long>", "Long", new String[] { 
"f0" });
+                       TypeInformation.of(new TypeHint<Tuple2<Long, 
Long>>(){}), Types.LONG, new String[] { "f0" });
        }
 
        private static class GroupReduce5 implements 
GroupReduceFunction<Tuple2<Long, Long>, Long> {
@@ -987,7 +1001,7 @@ public class UdfAnalyzerTest {
        @Test
        public void testForwardWithAtLeastOneIterationAssumption2() {
                
compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class,
 GroupReduce5.class,
-                               "Tuple2<Long, Long>", "Long", new String[] { 
"f1" });
+                       TypeInformation.of(new TypeHint<Tuple2<Long, 
Long>>(){}), Types.LONG, new String[] { "f1" });
        }
 
        private static class GroupReduce6 implements 
GroupReduceFunction<Tuple2<Long, Long>, Long> {
@@ -1005,7 +1019,7 @@ public class UdfAnalyzerTest {
        @Test
        public void testForwardWithAtLeastOneIterationAssumption3() {
                
compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class,
 GroupReduce6.class,
-                               "Tuple2<Long, Long>", "Long", new String[] { 
"f0" });
+                       TypeInformation.of(new TypeHint<Tuple2<Long, 
Long>>(){}), Types.LONG, new String[] { "f0" });
        }
 
        private static class GroupReduce7 implements 
GroupReduceFunction<Tuple2<Long, Long>, Long> {
@@ -1023,7 +1037,7 @@ public class UdfAnalyzerTest {
        @Test
        public void testForwardWithAtLeastOneIterationAssumption4() {
                
compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class,
 GroupReduce7.class,
-                               "Tuple2<Long, Long>", "Long", new String[] { 
"f0" });
+                       TypeInformation.of(new TypeHint<Tuple2<Long, 
Long>>(){}), Types.LONG, new String[] { "f0" });
        }
 
        @ForwardedFields("f0->*")
@@ -1042,7 +1056,7 @@ public class UdfAnalyzerTest {
        @Test
        public void testForwardWithAtLeastOneIterationAssumption5() {
                
compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class,
 GroupReduce8.class,
-                               "Tuple2<Long, Long>", "Long", new String[] { 
"f0" });
+                       TypeInformation.of(new TypeHint<Tuple2<Long, 
Long>>(){}), Types.LONG, new String[] { "f0" });
        }
 
        @ForwardedFields("f0")
@@ -1061,7 +1075,7 @@ public class UdfAnalyzerTest {
        @Test
        public void testForwardWithAtLeastOneIterationAssumption6() {
                
compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class,
 GroupReduce9.class,
-                               "Tuple2<Long, Long>", "Tuple2<Long, Long>", new 
String[] { "f0" });
+                       TypeInformation.of(new TypeHint<Tuple2<Long, 
Long>>(){}), TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), new 
String[] { "f0" });
        }
 
        private static class GroupReduce10 implements 
GroupReduceFunction<Tuple2<Long, Long>, Boolean> {
@@ -1082,7 +1096,7 @@ public class UdfAnalyzerTest {
        @Test
        public void testForwardWithAtLeastOneIterationAssumption7() {
                
compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class,
 GroupReduce10.class,
-                               "Tuple2<Long, Long>", "Boolean", new String[] { 
"f0" });
+                       TypeInformation.of(new TypeHint<Tuple2<Long, 
Long>>(){}), Types.BOOLEAN, new String[] { "f0" });
        }
 
        @ForwardedFields("field")
@@ -1096,9 +1110,9 @@ public class UdfAnalyzerTest {
        @Test
        public void testForwardWithReduce() {
                
compareAnalyzerResultWithAnnotationsSingleInputWithKeys(ReduceFunction.class, 
Reduce1.class,
-                               
"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
-                               
"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
-                               new String[] { "field" });
+                       TypeInformation.of(new TypeHint<MyPojo>(){}),
+                       TypeInformation.of(new TypeHint<MyPojo>(){}),
+                       new String[] { "field" });
        }
 
        @ForwardedFields("field")
@@ -1115,9 +1129,9 @@ public class UdfAnalyzerTest {
        @Test
        public void testForwardWithBranchingReduce() {
                
compareAnalyzerResultWithAnnotationsSingleInputWithKeys(ReduceFunction.class, 
Reduce2.class,
-                               
"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
-                               
"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
-                               new String[] { "field" });
+                       TypeInformation.of(new TypeHint<MyPojo>(){}),
+                       TypeInformation.of(new TypeHint<MyPojo>(){}),
+                       new String[] { "field" });
        }
 
        private static class NullReturnMapper1 implements MapFunction<String, 
String> {
@@ -1215,7 +1229,7 @@ public class UdfAnalyzerTest {
        public void testFilterModificationException1() {
                try {
                        final UdfAnalyzer ua = new 
UdfAnalyzer(FilterFunction.class, FilterMod1.class, "operator",
-                                       TypeInfoParser.parse("Tuple2<String, 
String>"), null, null, null, null, true);
+                               STRING_STRING_TUPLE2_TYPE_INFO, null, null, 
null, null, true);
                        ua.analyze();
                        Assert.fail();
                }
@@ -1237,7 +1251,7 @@ public class UdfAnalyzerTest {
        public void testFilterModificationException2() {
                try {
                        final UdfAnalyzer ua = new 
UdfAnalyzer(FilterFunction.class, FilterMod2.class, "operator",
-                                       TypeInfoParser.parse("Tuple2<String, 
String>"), null, null, null, null, true);
+                               STRING_STRING_TUPLE2_TYPE_INFO, null, null, 
null, null, true);
                        ua.analyze();
                        Assert.fail();
                }
@@ -1303,17 +1317,14 @@ public class UdfAnalyzerTest {
                }
        }
 
-       public static void 
compareAnalyzerResultWithAnnotationsSingleInput(Class<?> baseClass, Class<?> 
clazz, String in,
-                       String out) {
-               
compareAnalyzerResultWithAnnotationsSingleInputWithKeys(baseClass, clazz, in, 
out, null);
+       public static void 
compareAnalyzerResultWithAnnotationsSingleInput(Class<?> baseClass, Class<?> 
clazz,
+               TypeInformation<?> inType, TypeInformation<?> outType) {
+               
compareAnalyzerResultWithAnnotationsSingleInputWithKeys(baseClass, clazz, 
inType, outType, null);
        }
 
        @SuppressWarnings({ "rawtypes", "unchecked" })
        public static void 
compareAnalyzerResultWithAnnotationsSingleInputWithKeys(Class<?> baseClass, 
Class<?> clazz,
-                       String in, String out, String[] keys) {
-               final TypeInformation<?> inType = TypeInfoParser.parse(in);
-               final TypeInformation<?> outType = TypeInfoParser.parse(out);
-
+               TypeInformation<?> inType, TypeInformation<?> outType, String[] 
keys) {
                // expected
                final Set<Annotation> annotations = 
FunctionAnnotation.readSingleForwardAnnotations(clazz);
                SingleInputSemanticProperties expected = 
SemanticPropUtil.getSemanticPropsSingle(annotations, inType,
@@ -1331,18 +1342,14 @@ public class UdfAnalyzerTest {
                assertEquals(expected.toString(), actual.toString());
        }
 
-       public static void 
compareAnalyzerResultWithAnnotationsDualInput(Class<?> baseClass, Class<?> 
clazz, String in1,
-                       String in2, String out) {
-               
compareAnalyzerResultWithAnnotationsDualInputWithKeys(baseClass, clazz, in1, 
in2, out, null, null);
+       public static void 
compareAnalyzerResultWithAnnotationsDualInput(Class<?> baseClass, Class<?> 
clazz,
+               TypeInformation<?> in1Type, TypeInformation<?> in2Type, 
TypeInformation<?> outType) {
+               
compareAnalyzerResultWithAnnotationsDualInputWithKeys(baseClass, clazz, 
in1Type, in2Type, outType, null, null);
        }
 
        @SuppressWarnings({ "rawtypes", "unchecked" })
        public static void 
compareAnalyzerResultWithAnnotationsDualInputWithKeys(Class<?> baseClass, 
Class<?> clazz,
-                       String in1, String in2, String out, String[] keys1, 
String[] keys2) {
-               final TypeInformation<?> in1Type = TypeInfoParser.parse(in1);
-               final TypeInformation<?> in2Type = TypeInfoParser.parse(in2);
-               final TypeInformation<?> outType = TypeInfoParser.parse(out);
-
+               TypeInformation<?> in1Type, TypeInformation<?> in2Type, 
TypeInformation<?> outType, String[] keys1, String[] keys2) {
                // expected
                final Set<Annotation> annotations = 
FunctionAnnotation.readDualForwardAnnotations(clazz);
                final DualInputSemanticProperties expected = 
SemanticPropUtil.getSemanticPropsDual(annotations, in1Type,

http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb978b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java
index a31ce2e..06f486d 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java
@@ -18,8 +18,7 @@
 
 package org.apache.flink.graph.asm.dataset;
 
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
@@ -62,7 +61,7 @@ public class ChecksumHashCodeTest {
 
        @Test
        public void testEmptyList() throws Exception {
-               DataSet<Long> dataset = 
env.fromCollection(Collections.emptyList(), TypeInformation.of(new 
TypeHint<Long>(){}));
+               DataSet<Long> dataset = 
env.fromCollection(Collections.emptyList(), Types.LONG);
 
                Checksum checksum = new 
ChecksumHashCode<Long>().run(dataset).execute();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb978b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java
index cfeadce..027b809 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java
@@ -18,8 +18,7 @@
 
 package org.apache.flink.graph.asm.dataset;
 
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 
@@ -61,7 +60,7 @@ public class CollectTest {
 
        @Test
        public void testEmptyList() throws Exception {
-               DataSet<Long> dataset = 
env.fromCollection(Collections.emptyList(), TypeInformation.of(new 
TypeHint<Long>(){}));
+               DataSet<Long> dataset = 
env.fromCollection(Collections.emptyList(), Types.LONG);
 
                List<Long> collected = new 
Collect<Long>().run(dataset).execute();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb978b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java
index 0167a5f..dc92c55 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java
@@ -18,8 +18,7 @@
 
 package org.apache.flink.graph.asm.dataset;
 
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 
@@ -60,7 +59,7 @@ public class CountTest {
 
        @Test
        public void testEmptyList() throws Exception {
-               DataSet<Long> dataset = 
env.fromCollection(Collections.emptyList(), TypeInformation.of(new 
TypeHint<Long>(){}));
+               DataSet<Long> dataset = 
env.fromCollection(Collections.emptyList(), Types.LONG);
 
                long count = new Count<Long>().run(dataset).execute();
 

Reply via email to