Repository: flink
Updated Branches:
  refs/heads/master 68709b087 -> 4cc38fd36


[FLINK-3042] [FLINK-3060] [types] Define a way to let types create their own 
TypeInformation

This closes #2337.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4cc38fd3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4cc38fd3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4cc38fd3

Branch: refs/heads/master
Commit: 4cc38fd36f3190f9c0066e9cf94580669b2410cf
Parents: 68709b0
Author: twalthr <twal...@apache.org>
Authored: Thu Aug 4 17:01:08 2016 +0200
Committer: twalthr <twal...@apache.org>
Committed: Wed Sep 21 14:12:04 2016 +0200

----------------------------------------------------------------------
 docs/dev/types_serialization.md                 |  42 ++
 .../flink/api/common/typeinfo/TypeInfo.java     |  44 ++
 .../api/common/typeinfo/TypeInfoFactory.java    |  51 ++
 .../api/common/typeinfo/TypeInformation.java    |  24 +-
 .../flink/api/java/typeutils/TupleTypeInfo.java |  13 +-
 .../flink/api/java/typeutils/TypeExtractor.java | 260 ++++++++--
 .../api/java/typeutils/TypeInfoFactoryTest.java | 469 +++++++++++++++++++
 .../flink/api/scala/codegen/TypeAnalyzer.scala  |  25 +
 .../api/scala/codegen/TypeDescriptors.scala     |   8 +
 .../api/scala/codegen/TypeInformationGen.scala  |  22 +
 .../api/scala/typeutils/CaseClassTypeInfo.scala |   6 +-
 .../api/scala/typeutils/EitherTypeInfo.scala    |   3 +-
 .../api/scala/typeutils/EnumValueTypeInfo.scala |   2 +-
 .../api/scala/typeutils/OptionTypeInfo.scala    |   2 +-
 .../scala/typeutils/TraversableTypeInfo.scala   |   3 +-
 .../flink/api/scala/typeutils/TryTypeInfo.scala |   2 +-
 .../scala/typeutils/TypeInfoFactoryTest.scala   | 157 +++++++
 17 files changed, 1079 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/docs/dev/types_serialization.md
----------------------------------------------------------------------
diff --git a/docs/dev/types_serialization.md b/docs/dev/types_serialization.md
index 364aeb8..8a32491 100644
--- a/docs/dev/types_serialization.md
+++ b/docs/dev/types_serialization.md
@@ -251,3 +251,45 @@ env.getConfig().addDefaultKryoSerializer(Class<?> type, 
Class<? extends Serializ
 {% endhighlight %}
 
 There are different variants of these methods available.
+
+## Defining Type Information using a Factory
+
+A type information factory allows for plugging-in user-defined type 
information into the Flink type system.
+You have to implement `org.apache.flink.api.common.typeinfo.TypeInfoFactory` 
to return your custom type information. 
+The factory is called during the type extraction phase if the corresponding 
type has been annotated 
+with the `@org.apache.flink.api.common.typeinfo.TypeInfo` annotation. 
+
+Type information factories can be used in both the Java and Scala API.
+
+In a hierarchy of types the closest factory 
+will be chosen while traversing upwards, however, a built-in factory has 
highest precedence. A factory has 
+also higher precendence than Flink's built-in types, therefore you should know 
what you are doing.
+
+The following example shows how to annotate a custom type `MyTuple` and supply 
custom type information for it using a factory in Java.
+
+The annotated custom type:
+{% highlight java %}
+@TypeInfo(MyTupleTypeInfoFactory.class)
+public class MyTuple<T0, T1> {
+  public T0 myfield0;
+  public T1 myfield1;
+}
+{% endhighlight %}
+
+The factory supplying custom type information:
+{% highlight java %}
+public class MyTupleTypeInfoFactory extends TypeInfoFactory<MyTuple> {
+
+  @Override
+  public TypeInformation<MyTuple> createTypeInfo(Type t, Map<String, 
TypeInformation<?>> genericParameters) {
+    return new MyTupleTypeInfo(genericParameters.get("T0"), 
genericParameters.get("T1"));
+  }
+}
+{% endhighlight %}
+
+The method `createTypeInfo(Type, Map<String, TypeInformation<?>>)` creates 
type information for the type the factory is targeted for. 
+The parameters provide additional information about the type itself as well as 
the type's generic type parameters if available.
+
+If your type contains generic parameters that might need to be derived from 
the input type of a Flink function, make sure to also 
+implement 
`org.apache.flink.api.common.typeinfo.TypeInformation#getGenericParameters` for 
a bidirectional mapping of generic 
+parameters to type information.

http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInfo.java 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInfo.java
new file mode 100644
index 0000000..ce46827
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInfo.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeinfo;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import java.lang.reflect.Type;
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+/**
+ * Annotation for specifying a corresponding {@link TypeInfoFactory} that can 
produce
+ * {@link TypeInformation} for the annotated type. In a hierarchy of types the 
closest annotation
+ * that defines a factory will be chosen while traversing upwards, however, a 
globally registered
+ * factory has highest precedence (see {@link 
TypeExtractor#registerFactory(Type, Class)}).
+ */
+@Documented
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+@Public
+public @interface TypeInfo {
+
+       Class<? extends TypeInfoFactory> value();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInfoFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInfoFactory.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInfoFactory.java
new file mode 100644
index 0000000..ea15f3a
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInfoFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeinfo;
+
+import java.lang.reflect.Type;
+import java.util.Map;
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+/**
+ * Base class for implementing a type information factory. A type information 
factory allows for
+ * plugging-in user-defined {@link TypeInformation} into the Flink type 
system. The factory is
+ * called during the type extraction phase if the corresponding type has been 
annotated with
+ * {@link TypeInfo}. In a hierarchy of types the closest factory will be 
chosen while traversing
+ * upwards, however, a globally registered factory has highest precedence
+ * (see {@link TypeExtractor#registerFactory(Type, Class)}).
+ *
+ * @param <T> type for which {@link TypeInformation} is created
+ */
+@Public
+public abstract class TypeInfoFactory<T> {
+
+       /**
+        * Creates type information for the type the factory is targeted for. 
The parameters provide
+        * additional information about the type itself as well as the type's 
generic type parameters.
+        *
+        * @param t the exact type the type information is created for; might 
also be a subclass of &lt;T&gt;
+        * @param genericParameters mapping of the type's generic type 
parameters to type information
+        *                          extracted with Flink's type extraction 
facilities; null values
+        *                          indicate that type information could not be 
extracted for this parameter
+        * @return type information for the type the factory is targeted for
+        */
+       public abstract TypeInformation<T> createTypeInfo(Type t, Map<String, 
TypeInformation<?>> genericParameters);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
index 95eed6b..154ceb1 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
@@ -18,15 +18,16 @@
 
 package org.apache.flink.api.common.typeinfo;
 
+import java.util.Map;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 
 import java.io.Serializable;
 import java.util.Collections;
-import java.util.List;
 
 /**
  * TypeInformation is the core class of Flink's type system. Flink requires a 
type information
@@ -122,14 +123,25 @@ public abstract class TypeInformation<T> implements 
Serializable {
        public abstract Class<T> getTypeClass();
 
        /**
-        * Returns the generic parameters of this type.
+        * Optional method for giving Flink's type extraction system 
information about the mapping
+        * of a generic type parameter to the type information of a subtype. 
This information is necessary
+        * in cases where type information should be deduced from an input type.
         *
-        * @return The list of generic parameters. This list can be empty.
+        * For instance, a method for a {@link Tuple2} would look like this:
+        * <code>
+        * Map m = new HashMap();
+        * m.put("T0", this.getTypeAt(0));
+        * m.put("T1", this.getTypeAt(1));
+        * return m;
+        * </code>
+        *
+        * @return map of inferred subtypes; it does not have to contain all 
generic parameters as key;
+        *         values may be null if type could not be inferred
         */
        @PublicEvolving
-       public List<TypeInformation<?>> getGenericParameters() {
-               // Return an empty list as the default implementation
-               return Collections.emptyList();
+       public Map<String, TypeInformation<?>> getGenericParameters() {
+               // return an empty map as the default implementation
+               return Collections.emptyMap();
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
index d525ffb..e2cd789 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
@@ -22,6 +22,8 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.ExecutionConfig;
@@ -169,7 +171,16 @@ public final class TupleTypeInfo<T extends Tuple> extends 
TupleTypeInfoBase<T> {
                        );
                }
        }
-       
+
+       @Override
+       public Map<String, TypeInformation<?>> getGenericParameters() {
+               Map<String, TypeInformation<?>> m = new HashMap<>(types.length);
+               for (int i = 0; i < types.length; i++) {
+                       m.put("T" + i, types[i]);
+               }
+               return m;
+       }
+
        // 
--------------------------------------------------------------------------------------------
        
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index a722d72..a0b09f5 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -28,9 +28,12 @@ import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
 import java.lang.reflect.TypeVariable;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.HashMap;
 import java.util.List;
 
+import java.util.Map;
 import org.apache.avro.specific.SpecificRecordBase;
 
 import org.apache.commons.lang3.ClassUtils;
@@ -56,6 +59,8 @@ import 
org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -63,7 +68,8 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple0;
 import org.apache.flink.types.Either;
 import org.apache.flink.types.Value;
-
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -110,6 +116,34 @@ public class TypeExtractor {
        }
 
        // 
--------------------------------------------------------------------------------------------
+       //  TypeInfoFactory registry
+       // 
--------------------------------------------------------------------------------------------
+
+       private static Map<Type, Class<? extends TypeInfoFactory>> 
registeredTypeInfoFactories = new HashMap<>();
+
+       /**
+        * Registers a type information factory globally for a certain type. 
Every following type extraction
+        * operation will use the provided factory for this type. The factory 
will have highest precedence
+        * for this type. In a hierarchy of types the registered factory has 
higher precedence than annotations
+        * at the same level but lower precedence than factories defined down 
the hierarchy.
+        *
+        * @param t type for which a new factory is registered
+        * @param factory type information factory that will produce {@link 
TypeInformation}
+        */
+       private static void registerFactory(Type t, Class<? extends 
TypeInfoFactory> factory) {
+               Preconditions.checkNotNull(t, "Type parameter must not be 
null.");
+               Preconditions.checkNotNull(factory, "Factory parameter must not 
be null.");
+
+               if (!TypeInfoFactory.class.isAssignableFrom(factory)) {
+                       throw new IllegalArgumentException("Class is not a 
TypeInfoFactory.");
+               }
+               if (registeredTypeInfoFactories.containsKey(t)) {
+                       throw new InvalidTypesException("A TypeInfoFactory for 
type '" + t + "' is already registered.");
+               }
+               registeredTypeInfoFactories.put(t, factory);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
        //  Function specific methods
        // 
--------------------------------------------------------------------------------------------
 
@@ -592,9 +626,14 @@ public class TypeExtractor {
        @SuppressWarnings({ "unchecked", "rawtypes" })
        private <IN1, IN2, OUT> TypeInformation<OUT> 
createTypeInfoWithTypeHierarchy(ArrayList<Type> typeHierarchy, Type t,
                        TypeInformation<IN1> in1Type, TypeInformation<IN2> 
in2Type) {
-               
+
+               // check if type information can be created using a type factory
+               final TypeInformation<OUT> typeFromFactory = 
createTypeInfoFromFactory(t, typeHierarchy, in1Type, in2Type);
+               if (typeFromFactory != null) {
+                       return typeFromFactory;
+               }
                // check if type is a subclass of tuple
-               if (isClassType(t) && 
Tuple.class.isAssignableFrom(typeToClass(t))) {
+               else if (isClassType(t) && 
Tuple.class.isAssignableFrom(typeToClass(t))) {
                        Type curT = t;
                        
                        // do not allow usage of Tuple as type
@@ -622,7 +661,7 @@ public class TypeExtractor {
                        typeHierarchy.add(curT);
 
                        // create the type information for the subtypes
-                       TypeInformation<?>[] subTypesInfo = 
createSubTypesInfo(t, (ParameterizedType) curT, typeHierarchy, in1Type, 
in2Type);
+                       final TypeInformation<?>[] subTypesInfo = 
createSubTypesInfo(t, (ParameterizedType) curT, typeHierarchy, in1Type, 
in2Type, false);
                        // type needs to be treated a pojo due to additional 
fields
                        if (subTypesInfo == null) {
                                if (t instanceof ParameterizedType) {
@@ -655,7 +694,7 @@ public class TypeExtractor {
                        typeHierarchy.add(curT);
 
                        // create the type information for the subtypes
-                       TypeInformation<?>[] subTypesInfo = 
createSubTypesInfo(t, (ParameterizedType) curT, typeHierarchy, in1Type, 
in2Type);
+                       final TypeInformation<?>[] subTypesInfo = 
createSubTypesInfo(t, (ParameterizedType) curT, typeHierarchy, in1Type, 
in2Type, false);
                        // type needs to be treated a pojo due to additional 
fields
                        if (subTypesInfo == null) {
                                if (t instanceof ParameterizedType) {
@@ -807,12 +846,40 @@ public class TypeExtractor {
 
                return null;
        }
-       
+
+       @SuppressWarnings({"unchecked", "rawtypes"})
        private <IN1> TypeInformation<?> 
createTypeInfoFromInput(TypeVariable<?> returnTypeVar, ArrayList<Type> 
inputTypeHierarchy, Type inType, TypeInformation<IN1> inTypeInfo) {
                TypeInformation<?> info = null;
-               
+
+               // use a factory to find corresponding type information to type 
variable
+               final ArrayList<Type> factoryHierarchy = new 
ArrayList<>(inputTypeHierarchy);
+               final TypeInfoFactory<?> factory = 
getClosestFactory(factoryHierarchy, inType);
+               if (factory != null) {
+                       // the type that defines the factory is last in factory 
hierarchy
+                       final Type factoryDefiningType = 
factoryHierarchy.get(factoryHierarchy.size() - 1);
+                       // defining type has generics, the factory need to be 
asked for a mapping of subtypes to type information
+                       if (factoryDefiningType instanceof ParameterizedType) {
+                               final Type[] typeParams = 
typeToClass(factoryDefiningType).getTypeParameters();
+                               final Type[] actualParams = 
((ParameterizedType) factoryDefiningType).getActualTypeArguments();
+                               // go thru all elements and search for type 
variables
+                               for (int i = 0; i < actualParams.length; i++) {
+                                       final Map<String, TypeInformation<?>> 
componentInfo = inTypeInfo.getGenericParameters();
+                                       final String typeParamName = 
typeParams[i].toString();
+                                       if 
(!componentInfo.containsKey(typeParamName) || componentInfo.get(typeParamName) 
== null) {
+                                               throw new 
InvalidTypesException("TypeInformation '" + 
inTypeInfo.getClass().getSimpleName() +
+                                                       "' does not supply a 
mapping of TypeVariable '" + typeParamName + "' to corresponding 
TypeInformation. " +
+                                                       "Input type inference 
can only produce a result with this information. " +
+                                                       "Please implement 
method 'TypeInformation.getGenericParameters()' for this.");
+                                       }
+                                       info = 
createTypeInfoFromInput(returnTypeVar, factoryHierarchy, actualParams[i], 
componentInfo.get(typeParamName));
+                                       if (info != null) {
+                                               break;
+                                       }
+                               }
+                       }
+               }
                // the input is a type variable
-               if (inType instanceof TypeVariable) {
+               else if (inType instanceof TypeVariable) {
                        inType = materializeTypeVariable(inputTypeHierarchy, 
(TypeVariable<?>) inType);
                        info = findCorrespondingInfo(returnTypeVar, inType, 
inTypeInfo, inputTypeHierarchy);
                }
@@ -873,28 +940,30 @@ public class TypeExtractor {
         * @param typeHierarchy necessary for type inference
         * @param in1Type necessary for type inference
         * @param in2Type necessary for type inference
+        * @param lenient decides whether exceptions should be thrown if a 
subtype can not be determined
         * @return array containing TypeInformation of sub types or null if 
definingType contains
         *     more subtypes (fields) that defined
         */
        private <IN1, IN2> TypeInformation<?>[] createSubTypesInfo(Type 
originalType, ParameterizedType definingType,
-                       ArrayList<Type> typeHierarchy, TypeInformation<IN1> 
in1Type, TypeInformation<IN2> in2Type) {
+                       ArrayList<Type> typeHierarchy, TypeInformation<IN1> 
in1Type, TypeInformation<IN2> in2Type, boolean lenient) {
                Type[] subtypes = new 
Type[definingType.getActualTypeArguments().length];
 
                // materialize possible type variables
                for (int i = 0; i < subtypes.length; i++) {
+                       final Type actualTypeArg = 
definingType.getActualTypeArguments()[i];
                        // materialize immediate TypeVariables
-                       if (definingType.getActualTypeArguments()[i] instanceof 
TypeVariable<?>) {
-                               subtypes[i] = 
materializeTypeVariable(typeHierarchy, (TypeVariable<?>) 
definingType.getActualTypeArguments()[i]);
+                       if (actualTypeArg instanceof TypeVariable<?>) {
+                               subtypes[i] = 
materializeTypeVariable(typeHierarchy, (TypeVariable<?>) actualTypeArg);
                        }
                        // class or parameterized type
                        else {
-                               subtypes[i] = 
definingType.getActualTypeArguments()[i];
+                               subtypes[i] = actualTypeArg;
                        }
                }
 
                TypeInformation<?>[] subTypesInfo = new 
TypeInformation<?>[subtypes.length];
                for (int i = 0; i < subtypes.length; i++) {
-                       ArrayList<Type> subTypeHierarchy = new 
ArrayList<Type>(typeHierarchy);
+                       final ArrayList<Type> subTypeHierarchy = new 
ArrayList<>(typeHierarchy);
                        subTypeHierarchy.add(subtypes[i]);
                        // sub type could not be determined with materializing
                        // try to derive the type info of the TypeVariable from 
the immediate base child input as a last attempt
@@ -902,7 +971,7 @@ public class TypeExtractor {
                                subTypesInfo[i] = 
createTypeInfoFromInputs((TypeVariable<?>) subtypes[i], subTypeHierarchy, 
in1Type, in2Type);
 
                                // variable could not be determined
-                               if (subTypesInfo[i] == null) {
+                               if (subTypesInfo[i] == null && !lenient) {
                                        throw new InvalidTypesException("Type 
of TypeVariable '" + ((TypeVariable<?>) subtypes[i]).getName() + "' in '"
                                                        + ((TypeVariable<?>) 
subtypes[i]).getGenericDeclaration()
                                                        + "' could not be 
determined. This is most likely a type erasure problem. "
@@ -910,25 +979,75 @@ public class TypeExtractor {
                                                        + "all variables in the 
return type can be deduced from the input type(s).");
                                }
                        } else {
-                               subTypesInfo[i] = 
createTypeInfoWithTypeHierarchy(subTypeHierarchy, subtypes[i], in1Type, 
in2Type);
+                               // create the type information of the subtype 
or null/exception
+                               try {
+                                       subTypesInfo[i] = 
createTypeInfoWithTypeHierarchy(subTypeHierarchy, subtypes[i], in1Type, 
in2Type);
+                               } catch (InvalidTypesException e) {
+                                       if (lenient) {
+                                               subTypesInfo[i] = null;
+                                       } else {
+                                               throw e;
+                                       }
+                               }
                        }
                }
 
-               Class<?> originalTypeAsClass = null;
-               if (isClassType(originalType)) {
-                       originalTypeAsClass = typeToClass(originalType);
+               // check that number of fields matches the number of subtypes
+               if (!lenient) {
+                       Class<?> originalTypeAsClass = null;
+                       if (isClassType(originalType)) {
+                               originalTypeAsClass = typeToClass(originalType);
+                       }
+                       checkNotNull(originalTypeAsClass, "originalType has an 
unexpected type");
+                       // check if the class we assumed to conform to the 
defining type so far is actually a pojo because the
+                       // original type contains additional fields.
+                       // check for additional fields.
+                       int fieldCount = 
countFieldsInClass(originalTypeAsClass);
+                       if(fieldCount > subTypesInfo.length) {
+                               return null;
+                       }
                }
-               checkNotNull(originalTypeAsClass, "originalType has an 
unexpected type");
-               // check if the class we assumed to conform to the defining 
type so far is actually a pojo because the
-               // original type contains additional fields.
-               // check for additional fields.
-               int fieldCount = countFieldsInClass(originalTypeAsClass);
-               if(fieldCount > subTypesInfo.length) {
+
+               return subTypesInfo;
+       }
+
+       /**
+        * Creates type information using a factory if for this type or super 
types. Returns null otherwise.
+        */
+       @SuppressWarnings("unchecked")
+       private <IN1, IN2, OUT> TypeInformation<OUT> createTypeInfoFromFactory(
+                       Type t, ArrayList<Type> typeHierarchy, 
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
+
+               final ArrayList<Type> factoryHierarchy = new 
ArrayList<>(typeHierarchy);
+               final TypeInfoFactory<? super OUT> factory = 
getClosestFactory(factoryHierarchy, t);
+               if (factory == null) {
                        return null;
                }
-               return subTypesInfo;
+               final Type factoryDefiningType = 
factoryHierarchy.get(factoryHierarchy.size() - 1);
+
+               // infer possible type parameters from input
+               final Map<String, TypeInformation<?>> genericParams;
+               if (factoryDefiningType instanceof ParameterizedType) {
+                       genericParams = new HashMap<>();
+                       final ParameterizedType paramDefiningType = 
(ParameterizedType) factoryDefiningType;
+                       final Type[] args = 
typeToClass(paramDefiningType).getTypeParameters();
+
+                       final TypeInformation<?>[] subtypeInfo = 
createSubTypesInfo(t, paramDefiningType, factoryHierarchy, in1Type, in2Type, 
true);
+                       assert subtypeInfo != null;
+                       for (int i = 0; i < subtypeInfo.length; i++) {
+                               genericParams.put(args[i].toString(), 
subtypeInfo[i]);
+                       }
+               } else {
+                       genericParams = Collections.emptyMap();
+               }
+
+               final TypeInformation<OUT> createdTypeInfo = 
(TypeInformation<OUT>) factory.createTypeInfo(t, genericParams);
+               if (createdTypeInfo == null) {
+                       throw new InvalidTypesException("TypeInfoFactory 
returned invalid TypeInformation 'null'");
+               }
+               return createdTypeInfo;
        }
-       
+
        // 
--------------------------------------------------------------------------------------------
        //  Extract type parameters
        // 
--------------------------------------------------------------------------------------------
@@ -1254,6 +1373,31 @@ public class TypeExtractor {
        // 
--------------------------------------------------------------------------------------------
 
        /**
+        * Returns the type information factory for a type using the factory 
registry or annotations.
+        */
+       @Internal
+       public static <OUT> TypeInfoFactory<OUT> getTypeInfoFactory(Type t) {
+               final Class<?> factoryClass;
+               if (registeredTypeInfoFactories.containsKey(t)) {
+                       factoryClass = registeredTypeInfoFactories.get(t);
+               }
+               else {
+                       if (!isClassType(t) || 
!typeToClass(t).isAnnotationPresent(TypeInfo.class)) {
+                               return null;
+                       }
+                       final TypeInfo typeInfoAnnotation = 
typeToClass(t).getAnnotation(TypeInfo.class);
+                       factoryClass = typeInfoAnnotation.value();
+                       // check for valid factory class
+                       if 
(!TypeInfoFactory.class.isAssignableFrom(factoryClass)) {
+                               throw new InvalidTypesException("TypeInfo 
annotation does not specify a valid TypeInfoFactory.");
+                       }
+               }
+
+               // instantiate
+               return (TypeInfoFactory<OUT>) 
InstantiationUtil.instantiate(factoryClass);
+       }
+
+       /**
         * @return number of items with equal type or same raw type
         */
        private static int countTypeInHierarchy(ArrayList<Type> typeHierarchy, 
Type type) {
@@ -1265,27 +1409,46 @@ public class TypeExtractor {
                }
                return count;
        }
-       
+
        /**
-        * @param curT : start type
-        * @return Type The immediate child of the top class
+        * Traverses the type hierarchy of a type up until a certain stop class 
is found.
+        *
+        * @param t type for which a hierarchy need to be created
+        * @return type of the immediate child of the stop class
         */
-       private static Type getTypeHierarchy(ArrayList<Type> typeHierarchy, 
Type curT, Class<?> stopAtClass) {
-               // skip first one
-               if (typeHierarchy.size() > 0 && typeHierarchy.get(0) == curT && 
isClassType(curT)) {
-                       curT = typeToClass(curT).getGenericSuperclass();
+       private static Type getTypeHierarchy(ArrayList<Type> typeHierarchy, 
Type t, Class<?> stopAtClass) {
+               while (!(isClassType(t) && typeToClass(t).equals(stopAtClass))) 
{
+                       typeHierarchy.add(t);
+                       t = typeToClass(t).getGenericSuperclass();
+
+                       if (t == null) {
+                               break;
+                       }
                }
-               while (!(isClassType(curT) && 
typeToClass(curT).equals(stopAtClass))) {
-                       typeHierarchy.add(curT);
-                       curT = typeToClass(curT).getGenericSuperclass();
+               return t;
+       }
 
-                       if (curT == null) {
+       /**
+        * Traverses the type hierarchy up until a type information factory can 
be found.
+        *
+        * @param typeHierarchy hierarchy to be filled while traversing up
+        * @param t type for which a factory needs to be found
+        * @return closest type information factory or null if there is no 
factory in the type hierarchy
+        */
+       private static <OUT> TypeInfoFactory<? super OUT> 
getClosestFactory(ArrayList<Type> typeHierarchy, Type t) {
+               TypeInfoFactory factory = null;
+               while (factory == null && isClassType(t) && 
!(typeToClass(t).equals(Object.class))) {
+                       typeHierarchy.add(t);
+                       factory = getTypeInfoFactory(t);
+                       t = typeToClass(t).getGenericSuperclass();
+
+                       if (t == null) {
                                break;
                        }
                }
-               return curT;
+               return factory;
        }
-       
+
        private int countFieldsInClass(Class<?> clazz) {
                int fieldCount = 0;
                for(Field field : clazz.getFields()) { // get all fields
@@ -1486,17 +1649,26 @@ public class TypeExtractor {
         * @return TypeInformation that describes the passed Class
         */
        public static <X> TypeInformation<X> getForClass(Class<X> clazz) {
-               return new TypeExtractor().privateGetForClass(clazz, new 
ArrayList<Type>());
+               final ArrayList<Type> typeHierarchy = new ArrayList<>();
+               typeHierarchy.add(clazz);
+               return new TypeExtractor().privateGetForClass(clazz, 
typeHierarchy);
        }
        
        private <X> TypeInformation<X> privateGetForClass(Class<X> clazz, 
ArrayList<Type> typeHierarchy) {
                return privateGetForClass(clazz, typeHierarchy, null, null, 
null);
        }
+
        @SuppressWarnings({ "unchecked", "rawtypes" })
        private <OUT,IN1,IN2> TypeInformation<OUT> 
privateGetForClass(Class<OUT> clazz, ArrayList<Type> typeHierarchy,
                        ParameterizedType parameterizedType, 
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
                checkNotNull(clazz);
 
+               // check if type information can be produced using a factory
+               final TypeInformation<OUT> typeFromFactory = 
createTypeInfoFromFactory(clazz, typeHierarchy, in1Type, in2Type);
+               if (typeFromFactory != null) {
+                       return typeFromFactory;
+               }
+
                // Object is handled as generic type info
                if (clazz.equals(Object.class)) {
                        return new GenericTypeInfo<>(clazz);
@@ -1859,6 +2031,14 @@ public class TypeExtractor {
        private <X> TypeInformation<X> privateGetForObject(X value) {
                checkNotNull(value);
 
+               // check if type information can be produced using a factory
+               final ArrayList<Type> typeHierarchy = new ArrayList<>();
+               typeHierarchy.add(value.getClass());
+               final TypeInformation<X> typeFromFactory = 
createTypeInfoFromFactory(value.getClass(), typeHierarchy, null, null);
+               if (typeFromFactory != null) {
+                       return typeFromFactory;
+               }
+
                // check if we can extract the types from tuples, otherwise 
work with the class
                if (value instanceof Tuple) {
                        Tuple t = (Tuple) value;

http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoFactoryTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoFactoryTest.java
new file mode 100644
index 0000000..f055879
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoFactoryTest.java
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import java.lang.reflect.Type;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.junit.Test;
+
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.BOOLEAN_TYPE_INFO;
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.DOUBLE_TYPE_INFO;
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.FLOAT_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO;
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for extracting {@link 
org.apache.flink.api.common.typeinfo.TypeInformation} from types
+ * using a {@link org.apache.flink.api.common.typeinfo.TypeInfoFactory}
+ */
+public class TypeInfoFactoryTest {
+
+       @Test
+       public void testSimpleType() {
+               TypeInformation<?> ti = 
TypeExtractor.createTypeInfo(IntLike.class);
+               assertEquals(INT_TYPE_INFO, ti);
+
+               ti = TypeExtractor.getForClass(IntLike.class);
+               assertEquals(INT_TYPE_INFO, ti);
+
+               ti = TypeExtractor.getForObject(new IntLike());
+               assertEquals(INT_TYPE_INFO, ti);
+       }
+
+       @Test
+       public void testMyEitherGenericType() {
+               MapFunction<Boolean, MyEither<Boolean, String>> f = new 
MyEitherMapper<>();
+               TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, 
BOOLEAN_TYPE_INFO);
+               assertTrue(ti instanceof EitherTypeInfo);
+               EitherTypeInfo eti = (EitherTypeInfo) ti;
+               assertEquals(BOOLEAN_TYPE_INFO, eti.getLeftType());
+               assertEquals(STRING_TYPE_INFO, eti.getRightType());
+       }
+
+       @Test
+       public void testMyOptionGenericType() {
+               TypeInformation<MyOption<Tuple2<Boolean, String>>> inTypeInfo = 
new MyOptionTypeInfo<>(
+                       new TupleTypeInfo<Tuple2<Boolean, 
String>>(BOOLEAN_TYPE_INFO, STRING_TYPE_INFO));
+               MapFunction<MyOption<Tuple2<Boolean, String>>, 
MyOption<Tuple2<Boolean, Boolean>>> f = new MyOptionMapper<>();
+               TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, 
inTypeInfo);
+               assertTrue(ti instanceof MyOptionTypeInfo);
+               MyOptionTypeInfo oti = (MyOptionTypeInfo) ti;
+               assertTrue(oti.getInnerType() instanceof TupleTypeInfo);
+               TupleTypeInfo tti = (TupleTypeInfo) oti.getInnerType();
+               assertEquals(BOOLEAN_TYPE_INFO, tti.getTypeAt(0));
+               assertEquals(BOOLEAN_TYPE_INFO, tti.getTypeAt(1));
+       }
+
+       @Test
+       public void testMyTuple() {
+               TypeInformation<Tuple1<MyTuple<Double, String>>> inTypeInfo = 
new TupleTypeInfo<>(
+                       new MyTupleTypeInfo(DOUBLE_TYPE_INFO, 
STRING_TYPE_INFO));
+               MapFunction<Tuple1<MyTuple<Double, String>>, 
Tuple1<MyTuple<Boolean, Double>>> f = new MyTupleMapperL2<>();
+               TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, 
inTypeInfo);
+               assertTrue(ti instanceof TupleTypeInfo);
+               TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti;
+               assertTrue(tti.getTypeAt(0) instanceof MyTupleTypeInfo);
+               MyTupleTypeInfo mtti = (MyTupleTypeInfo) tti.getTypeAt(0);
+               assertEquals(BOOLEAN_TYPE_INFO, mtti.getField0());
+               assertEquals(DOUBLE_TYPE_INFO, mtti.getField1());
+       }
+
+       @Test
+       public void testMyTupleHierarchy() {
+               TypeInformation<?> ti = 
TypeExtractor.createTypeInfo(MyTuple2.class);
+               assertTrue(ti instanceof MyTupleTypeInfo);
+               MyTupleTypeInfo<?, ?> mtti = (MyTupleTypeInfo) ti;
+               assertEquals(STRING_TYPE_INFO, mtti.getField0());
+               assertEquals(BOOLEAN_TYPE_INFO, mtti.getField1());
+       }
+
+       @Test
+       public void testMyTupleHierarchyWithInference() {
+               TypeInformation<Tuple1<MyTuple3<Tuple1<Float>>>> inTypeInfo = 
new TupleTypeInfo<>(new MyTupleTypeInfo<>(
+                       new TupleTypeInfo<Tuple1<Float>>(FLOAT_TYPE_INFO), 
BOOLEAN_TYPE_INFO));
+               MapFunction<Tuple1<MyTuple3<Tuple1<Float>>>, 
Tuple1<MyTuple3<Tuple2<Float, String>>>> f = new MyTuple3Mapper<>();
+               TypeInformation ti = TypeExtractor.getMapReturnTypes(f, 
inTypeInfo);
+               assertTrue(ti instanceof TupleTypeInfo);
+               TupleTypeInfo<?> tti = (TupleTypeInfo) ti;
+               assertTrue(tti.getTypeAt(0) instanceof MyTupleTypeInfo);
+               MyTupleTypeInfo mtti = (MyTupleTypeInfo) tti.getTypeAt(0);
+               assertEquals(new TupleTypeInfo<>(FLOAT_TYPE_INFO, 
STRING_TYPE_INFO), mtti.getField0());
+               assertEquals(BOOLEAN_TYPE_INFO, mtti.getField1());
+       }
+
+       @Test(expected = InvalidTypesException.class)
+       public void testMissingTypeInfo() {
+               MapFunction f = new MyFaultyMapper();
+               TypeExtractor.getMapReturnTypes(f, INT_TYPE_INFO);
+       }
+
+       @Test(expected = InvalidTypesException.class)
+       public void testMissingTypeInference() {
+               MapFunction f = new MyFaultyMapper2();
+               TypeExtractor.getMapReturnTypes(f, new MyFaultyTypeInfo());
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Utilities
+       // 
--------------------------------------------------------------------------------------------
+
+       public static class MyTuple3Mapper<Y> implements 
MapFunction<Tuple1<MyTuple3<Tuple1<Y>>>, Tuple1<MyTuple3<Tuple2<Y, String>>>> {
+               @Override
+               public Tuple1<MyTuple3<Tuple2<Y, String>>> 
map(Tuple1<MyTuple3<Tuple1<Y>>> value) throws Exception {
+                       return null;
+               }
+       }
+
+       public static class MyTuple3<T> extends MyTuple<T, Boolean> {
+               // empty
+       }
+
+       public static class MyTuple2 extends MyTuple<String, Boolean> {
+               // empty
+       }
+
+       public static class MyFaultyMapper2<T> implements 
MapFunction<MyFaulty<T>, MyFaulty<T>> {
+               @Override
+               public MyFaulty<T> map(MyFaulty<T> value) throws Exception {
+                       return null;
+               }
+       }
+
+       public static class MyFaultyMapper<T> implements MapFunction<T, 
MyFaulty<T>> {
+               @Override
+               public MyFaulty<T> map(T value) throws Exception {
+                       return null;
+               }
+       }
+
+       @TypeInfo(FaultyTypeInfoFactory.class)
+       public static class MyFaulty<Y> {
+               // empty
+       }
+
+       public static class FaultyTypeInfoFactory extends TypeInfoFactory {
+               @Override
+               public TypeInformation createTypeInfo(Type t, Map 
genericParameters) {
+                       return null;
+               }
+       }
+
+       public static class MyFaultyTypeInfo extends TypeInformation<MyFaulty> {
+               @Override
+               public boolean isBasicType() {
+                       return false;
+               }
+
+               @Override
+               public boolean isTupleType() {
+                       return false;
+               }
+
+               @Override
+               public int getArity() {
+                       return 0;
+               }
+
+               @Override
+               public int getTotalFields() {
+                       return 0;
+               }
+
+               @Override
+               public Class<MyFaulty> getTypeClass() {
+                       return null;
+               }
+
+               @Override
+               public boolean isKeyType() {
+                       return false;
+               }
+
+               @Override
+               public TypeSerializer<MyFaulty> 
createSerializer(ExecutionConfig config) {
+                       return null;
+               }
+
+               @Override
+               public String toString() {
+                       return null;
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       return false;
+               }
+
+               @Override
+               public int hashCode() {
+                       return 0;
+               }
+
+               @Override
+               public boolean canEqual(Object obj) {
+                       return false;
+               }
+       }
+
+       public static class MyTupleMapperL1<A, B> implements 
MapFunction<Tuple1<MyTuple<A, String>>, Tuple1<MyTuple<B, A>>> {
+               @Override
+               public Tuple1<MyTuple<B, A>> map(Tuple1<MyTuple<A, String>> 
value) throws Exception {
+                       return null;
+               }
+       }
+
+       public static class MyTupleMapperL2<C> extends MyTupleMapperL1<C, 
Boolean> {
+               // empty
+       }
+
+       @TypeInfo(MyTupleTypeInfoFactory.class)
+       public static class MyTuple<T0, T1> {
+               // empty
+       }
+
+       public static class MyTupleTypeInfoFactory extends 
TypeInfoFactory<MyTuple> {
+               @Override
+               @SuppressWarnings("unchecked")
+               public TypeInformation<MyTuple> createTypeInfo(Type t, 
Map<String, TypeInformation<?>> genericParameters) {
+                       return new MyTupleTypeInfo(genericParameters.get("T0"), 
genericParameters.get("T1"));
+               }
+       }
+
+       public static class MyTupleTypeInfo<T0, T1> extends 
TypeInformation<MyTuple<T0, T1>> {
+               private TypeInformation field0;
+               private TypeInformation field1;
+
+               public TypeInformation getField0() {
+                       return field0;
+               }
+
+               public TypeInformation getField1() {
+                       return field1;
+               }
+
+               public MyTupleTypeInfo(TypeInformation field0, TypeInformation 
field1) {
+                       this.field0 = field0;
+                       this.field1 = field1;
+               }
+
+               @Override
+               public boolean isBasicType() {
+                       return false;
+               }
+
+               @Override
+               public boolean isTupleType() {
+                       return false;
+               }
+
+               @Override
+               public int getArity() {
+                       return 0;
+               }
+
+               @Override
+               public int getTotalFields() {
+                       return 0;
+               }
+
+               @Override
+               public Class<MyTuple<T0, T1>> getTypeClass() {
+                       return null;
+               }
+
+               @Override
+               public boolean isKeyType() {
+                       return false;
+               }
+
+               @Override
+               public TypeSerializer<MyTuple<T0, T1>> 
createSerializer(ExecutionConfig config) {
+                       return null;
+               }
+
+               @Override
+               public String toString() {
+                       return null;
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       return false;
+               }
+
+               @Override
+               public int hashCode() {
+                       return 0;
+               }
+
+               @Override
+               public boolean canEqual(Object obj) {
+                       return false;
+               }
+
+               @Override
+               public Map<String, TypeInformation<?>> getGenericParameters() {
+                       Map<String, TypeInformation<?>> map = new HashMap<>(2);
+                       map.put("T0", field0);
+                       map.put("T1", field1);
+                       return map;
+               }
+       }
+
+       public static class MyOptionMapper<T> implements 
MapFunction<MyOption<Tuple2<T, String>>, MyOption<Tuple2<T, T>>> {
+               @Override
+               public MyOption<Tuple2<T, T>> map(MyOption<Tuple2<T, String>> 
value) throws Exception {
+                       return null;
+               }
+       }
+
+       @TypeInfo(MyOptionTypeInfoFactory.class)
+       public static class MyOption<T> {
+               // empty
+       }
+
+       public static class MyOptionTypeInfoFactory<T> extends 
TypeInfoFactory<MyOption<T>> {
+               @Override
+               @SuppressWarnings("unchecked")
+               public TypeInformation<MyOption<T>> createTypeInfo(Type t, 
Map<String, TypeInformation<?>> genericParams) {
+                       return new MyOptionTypeInfo(genericParams.get("T"));
+               }
+       }
+
+       public static class MyOptionTypeInfo<T> extends 
TypeInformation<MyOption<T>> {
+
+               private final TypeInformation<T> innerType;
+
+               public MyOptionTypeInfo(TypeInformation<T> innerType) {
+                       this.innerType = innerType;
+               }
+
+               public TypeInformation<T> getInnerType() {
+                       return innerType;
+               }
+
+               @Override
+               public boolean isBasicType() {
+                       return false;
+               }
+
+               @Override
+               public boolean isTupleType() {
+                       return false;
+               }
+
+               @Override
+               public int getArity() {
+                       return 0;
+               }
+
+               @Override
+               public int getTotalFields() {
+                       return 0;
+               }
+
+               @Override
+               public Class<MyOption<T>> getTypeClass() {
+                       return null;
+               }
+
+               @Override
+               public boolean isKeyType() {
+                       return false;
+               }
+
+               @Override
+               public TypeSerializer<MyOption<T>> 
createSerializer(ExecutionConfig config) {
+                       return null;
+               }
+
+               @Override
+               public String toString() {
+                       return null;
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       return false;
+               }
+
+               @Override
+               public int hashCode() {
+                       return 0;
+               }
+
+               @Override
+               public boolean canEqual(Object obj) {
+                       return false;
+               }
+
+               @Override
+               public Map<String, TypeInformation<?>> getGenericParameters() {
+                       Map<String, TypeInformation<?>> map = new HashMap<>(1);
+                       map.put("T", innerType);
+                       return map;
+               }
+       }
+
+       public static class MyEitherMapper<T> implements MapFunction<T, 
MyEither<T, String>> {
+               @Override
+               public MyEither<T, String> map(T value) throws Exception {
+                       return null;
+               }
+       }
+
+       @TypeInfo(MyEitherTypeInfoFactory.class)
+       public static class MyEither<A, B> {
+               // empty
+       }
+
+       public static class MyEitherTypeInfoFactory<A, B> extends 
TypeInfoFactory<MyEither<A, B>> {
+               @Override
+               @SuppressWarnings("unchecked")
+               public TypeInformation<MyEither<A,B>> createTypeInfo(Type t, 
Map<String, TypeInformation<?>> genericParams) {
+                       return new EitherTypeInfo(genericParams.get("A"), 
genericParams.get("B"));
+               }
+       }
+
+       @TypeInfo(IntLikeTypeInfoFactory.class)
+       public static class IntLike {
+               // empty
+       }
+
+       public static class IntLikeTypeInfoFactory extends 
TypeInfoFactory<IntLike> {
+               @Override
+               @SuppressWarnings("unchecked")
+               public TypeInformation<IntLike> createTypeInfo(Type t, 
Map<String, TypeInformation<?>> genericParams) {
+                       return (TypeInformation) INT_TYPE_INFO;
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala
index a8587ef..11d5ec7 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala
@@ -51,6 +51,9 @@ private[flink] trait TypeAnalyzer[C <: Context] { this: 
MacroContextHolder[C]
 
           case TypeParameter() => TypeParameterDescriptor(id, tpe)
 
+          // type or super type defines type information factory
+          case FactoryType(baseType) => analyzeFactoryType(id, tpe, baseType)
+
           case PrimitiveType(default, wrapper) => PrimitiveDescriptor(id, tpe, 
default, wrapper)
 
           case BoxedPrimitiveType(default, wrapper, box, unbox) =>
@@ -91,6 +94,19 @@ private[flink] trait TypeAnalyzer[C <: Context] { this: 
MacroContextHolder[C]
       }
     }
 
+    private def analyzeFactoryType(
+        id: Int,
+        tpe: Type,
+        baseType: Type): UDTDescriptor = {
+      val params: Seq[UDTDescriptor] = baseType match {
+        case TypeRef(_, _, args) =>
+          args.map(analyze)
+        case _ =>
+          Seq[UDTDescriptor]()
+      }
+      FactoryTypeDescriptor(id, tpe, baseType, params)
+    }
+
     private def analyzeArray(
         id: Int,
         tpe: Type,
@@ -438,6 +454,15 @@ private[flink] trait TypeAnalyzer[C <: Context] { this: 
MacroContextHolder[C]
       def unapply(tpe: Type): Boolean = tpe <:< 
typeOf[org.apache.flink.api.java.tuple.Tuple]
     }
 
+    private object FactoryType {
+      def unapply(tpe: Type): Option[Type] = {
+        val definingType = tpe.typeSymbol.asClass.baseClasses find {
+          _.annotations.exists(_.tpe =:= 
typeOf[org.apache.flink.api.common.typeinfo.TypeInfo])
+        }
+        definingType.map(tpe.baseType)
+      }
+    }
+
     private class UDTAnalyzerCache {
 
       private val caches = new DynamicVariable[Map[Type, 
RecursiveDescriptor]](Map())

http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeDescriptors.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeDescriptors.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeDescriptors.scala
index 4efa546..9efde0f 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeDescriptors.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeDescriptors.scala
@@ -19,6 +19,7 @@ package org.apache.flink.api.scala.codegen
 
 import org.apache.flink.annotation.Internal
 
+import scala.collection.Map
 import scala.language.postfixOps
 import scala.reflect.macros.Context
 
@@ -53,6 +54,13 @@ private[flink] trait TypeDescriptors[C <: Context] { this: 
MacroContextHolder[C]
 
   case class TryDescriptor(id: Int, tpe: Type, elem: UDTDescriptor) extends 
UDTDescriptor
 
+  case class FactoryTypeDescriptor(
+      id: Int,
+      tpe: Type,
+      baseType: Type,
+      params: Seq[UDTDescriptor])
+    extends UDTDescriptor
+
   case class OptionDescriptor(id: Int, tpe: Type, elem: UDTDescriptor) extends 
UDTDescriptor
 
   case class BoxedPrimitiveDescriptor(

http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
index ee0d167..9736e81 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
@@ -52,6 +52,9 @@ private[flink] trait TypeInformationGen[C <: Context] {
   // We have this for internal use so that we can use it to recursively 
generate a tree of
   // TypeInformation from a tree of UDTDescriptor
   def mkTypeInfo[T: c.WeakTypeTag](desc: UDTDescriptor): 
c.Expr[TypeInformation[T]] = desc match {
+
+    case f: FactoryTypeDescriptor => mkTypeInfoFromFactory(f)
+
     case cc@CaseClassDescriptor(_, tpe, _, _, _) =>
       
mkCaseClassTypeInfo(cc)(c.WeakTypeTag(tpe).asInstanceOf[c.WeakTypeTag[Product]])
         .asInstanceOf[c.Expr[TypeInformation[T]]]
@@ -93,6 +96,25 @@ private[flink] trait TypeInformationGen[C <: Context] {
     case d => mkGenericTypeInfo(d)
   }
 
+  def mkTypeInfoFromFactory[T: c.WeakTypeTag](desc: FactoryTypeDescriptor)
+    : c.Expr[TypeInformation[T]] = {
+
+    val tpeClazz = c.Expr[Class[T]](Literal(Constant(desc.tpe)))
+    val baseClazz = c.Expr[Class[T]](Literal(Constant(desc.baseType)))
+
+    val typeInfos = desc.params map { p => 
mkTypeInfo(p)(c.WeakTypeTag(p.tpe)).tree }
+    val typeInfosList = 
c.Expr[List[TypeInformation[_]]](mkList(typeInfos.toList))
+
+    reify {
+      val factory = TypeExtractor.getTypeInfoFactory[T](baseClazz.splice)
+      val genericParameters = typeInfosList.splice
+        .zip(baseClazz.splice.getTypeParameters).map { case (typeInfo, 
typeParam) =>
+          typeParam.getName -> typeInfo
+        }.toMap[String, TypeInformation[_]]
+      factory.createTypeInfo(tpeClazz.splice, genericParameters.asJava)
+    }
+  }
+
   def mkCaseClassTypeInfo[T <: Product : c.WeakTypeTag](
       desc: CaseClassDescriptor): c.Expr[TypeInformation[T]] = {
     val tpeClazz = c.Expr[Class[T]](Literal(Constant(desc.tpe)))

http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
index d658fde..2aecd7a 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
@@ -47,8 +47,10 @@ abstract class CaseClassTypeInfo[T <: Product](
   extends TupleTypeInfoBase[T](clazz, fieldTypes: _*) {
 
   @PublicEvolving
-  override def getGenericParameters: java.util.List[TypeInformation[_]] = {
-    typeParamTypeInfos.toList.asJava
+  override def getGenericParameters: java.util.Map[String, TypeInformation[_]] 
= {
+    typeParamTypeInfos.zipWithIndex.map { case (info, index) =>
+      "T" + (index + 1) -> info
+    }.toMap[String, TypeInformation[_]].asJava
   }
 
   private val REGEX_INT_FIELD: String = "[0-9]+"

http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
index 406f073..e897309 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
@@ -47,7 +47,8 @@ class EitherTypeInfo[A, B, T <: Either[A, B]](
   @PublicEvolving
   override def getTypeClass = clazz
   @PublicEvolving
-  override def getGenericParameters = List[TypeInformation[_]](leftTypeInfo, 
rightTypeInfo).asJava
+  override def getGenericParameters =
+    Map[String, TypeInformation[_]]("A" -> leftTypeInfo, "B" -> 
rightTypeInfo).asJava
 
   @PublicEvolving
   def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T] = {

http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfo.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfo.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfo.scala
index 92d2704..efc6427 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfo.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfo.scala
@@ -46,7 +46,7 @@ class EnumValueTypeInfo[E <: Enumeration](val enum: E, val 
clazz: Class[E#Value]
   @PublicEvolving
   override def getTypeClass = clazz
   @PublicEvolving
-  override def getGenericParameters = List.empty[TypeInformation[_]].asJava
+  override def getGenericParameters = Map.empty[String, 
TypeInformation[_]].asJava
 
 
   @PublicEvolving

http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
index 58ae77c..73fe580 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
@@ -44,7 +44,7 @@ class OptionTypeInfo[A, T <: Option[A]](private val 
elemTypeInfo: TypeInformatio
   @PublicEvolving
   override def getTypeClass = classOf[Option[_]].asInstanceOf[Class[T]]
   @PublicEvolving
-  override def getGenericParameters = 
List[TypeInformation[_]](elemTypeInfo).asJava
+  override def getGenericParameters = Map[String, TypeInformation[_]]("A" -> 
elemTypeInfo).asJava
 
   @PublicEvolving
   override def createComparator(ascending: Boolean, executionConfig: 
ExecutionConfig) = {

http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
index 82fd8ae..47fb039 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
@@ -46,7 +46,8 @@ abstract class TraversableTypeInfo[T <: TraversableOnce[E], 
E](
   @PublicEvolving
   override def getTypeClass: Class[T] = clazz
   @PublicEvolving
-  override def getGenericParameters = 
List[TypeInformation[_]](elementTypeInfo).asJava
+  override def getGenericParameters =
+    Map[String, TypeInformation[_]]("A" -> elementTypeInfo).asJava
 
   @PublicEvolving
   def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T]

http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala
index 0a5a06d..b09c353 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala
@@ -46,7 +46,7 @@ class TryTypeInfo[A, T <: Try[A]](val elemTypeInfo: 
TypeInformation[A])
   @PublicEvolving
   override def getTypeClass = classOf[Try[_]].asInstanceOf[Class[T]]
   @PublicEvolving
-  override def getGenericParameters = 
List[TypeInformation[_]](elemTypeInfo).asJava
+  override def getGenericParameters = Map[String, TypeInformation[_]]("T" -> 
elemTypeInfo).asJava
 
   @PublicEvolving
   def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T] = {

http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TypeInfoFactoryTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TypeInfoFactoryTest.scala
 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TypeInfoFactoryTest.scala
new file mode 100644
index 0000000..5873630
--- /dev/null
+++ 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TypeInfoFactoryTest.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils
+
+import java.lang.reflect.Type
+import java.util
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInfoFactory, 
TypeInformation}
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.typeutils.TypeInfoFactoryTest._
+import org.apache.flink.api.java.typeutils.{EitherTypeInfo => 
JavaEitherTypeInfo}
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.typeutils.TypeInfoFactoryTest._
+import org.apache.flink.util.TestLogger
+import org.junit.Assert._
+import org.junit.Test
+import org.scalatest.junit.JUnitSuiteLike
+
+class TypeInfoFactoryTest  extends TestLogger with JUnitSuiteLike {
+
+  @Test
+  def testSimpleType(): Unit = {
+    val ti = createTypeInformation[ScalaIntLike]
+    assertEquals(INT_TYPE_INFO, ti)
+  }
+
+  @Test
+  def testMyTuple(): Unit = {
+    val ti = createTypeInformation[MyTuple[Double, String]]
+    assertTrue(ti.isInstanceOf[MyTupleTypeInfo[_, _]])
+    val mtti = ti.asInstanceOf[MyTupleTypeInfo[_, _]]
+    assertEquals(DOUBLE_TYPE_INFO, mtti.getField0)
+    assertEquals(STRING_TYPE_INFO, mtti.getField1)
+  }
+
+  @Test
+  def testMyTupleHierarchy() {
+    val ti = createTypeInformation[MyTuple2]
+    assertTrue(ti.isInstanceOf[MyTupleTypeInfo[_, _]])
+    val mtti = ti.asInstanceOf[MyTupleTypeInfo[_, _]]
+    assertEquals(STRING_TYPE_INFO, mtti.getField0)
+    assertEquals(BOOLEAN_TYPE_INFO, mtti.getField1)
+
+    val ti2 = createTypeInformation[MyScalaTupleClass]
+    assertTrue(ti2.isInstanceOf[MyTupleTypeInfo[_, _]])
+    val mtti2 = ti2.asInstanceOf[MyTupleTypeInfo[_, _]]
+    assertEquals(STRING_TYPE_INFO, mtti.getField0)
+    assertEquals(BOOLEAN_TYPE_INFO, mtti.getField1)
+  }
+
+  @Test
+  def testMyTupleHierarchyWithCaseClass(): Unit = {
+    val ti = createTypeInformation[MyScalaTupleCaseClass]
+    assertTrue(ti.isInstanceOf[MyTupleTypeInfo[_, _]])
+    val mtti = ti.asInstanceOf[MyTupleTypeInfo[_, _]]
+    assertEquals(DOUBLE_TYPE_INFO, mtti.getField0)
+    assertEquals(BOOLEAN_TYPE_INFO, mtti.getField1)
+  }
+
+  @Test
+  def testMyEitherGenericType(): Unit = {
+    val ti = createTypeInformation[MyScalaEither[String, (Double, Int)]]
+    assertTrue(ti.isInstanceOf[JavaEitherTypeInfo[_, _]])
+    val eti = ti.asInstanceOf[JavaEitherTypeInfo[_, _]]
+    assertEquals(STRING_TYPE_INFO, eti.getLeftType)
+    assertTrue(eti.getRightType.isInstanceOf[CaseClassTypeInfo[_]])
+    val cti = eti.getRightType.asInstanceOf[CaseClassTypeInfo[_]]
+    assertEquals(DOUBLE_TYPE_INFO, cti.getTypeAt(0))
+    assertEquals(INT_TYPE_INFO, cti.getTypeAt(1))
+  }
+
+  @Test
+  def testScalaFactory(): Unit = {
+    val ti = createTypeInformation[MyScalaOption[Double]]
+    assertTrue(ti.isInstanceOf[MyScalaOptionTypeInfo])
+    val moti = ti.asInstanceOf[MyScalaOptionTypeInfo]
+    assertEquals(DOUBLE_TYPE_INFO, moti.elementType)
+  }
+}
+
+// 
--------------------------------------------------------------------------------------------
+//  Utilities
+// 
--------------------------------------------------------------------------------------------
+
+object TypeInfoFactoryTest {
+
+  @TypeInfo(classOf[IntLikeTypeInfoFactory])
+  case class ScalaIntLike(myint: Int)
+
+  class MyScalaTupleClass extends MyTuple2
+
+  case class MyScalaTupleCaseClass(additional: Boolean) extends 
MyTuple3[Double]
+
+  @TypeInfo(classOf[MyEitherTypeInfoFactory[_, _]])
+  class MyScalaEither[A, B] {
+    // do nothing here
+  }
+
+  @TypeInfo(classOf[MyScalaOptionTypeInfoFactory])
+  class MyScalaOption[Z] {
+    // do nothing here
+  }
+
+  class MyScalaOptionTypeInfoFactory extends TypeInfoFactory[MyOption[_]] {
+
+    override def createTypeInfo(
+        t: Type,
+        genericParameters: util.Map[String, TypeInformation[_]])
+      : TypeInformation[MyOption[_]] = {
+      new MyScalaOptionTypeInfo(genericParameters.get("Z"))
+    }
+  }
+
+  class MyScalaOptionTypeInfo(val elementType: TypeInformation[_])
+    extends TypeInformation[MyOption[_]] {
+    
+    override def isBasicType: Boolean = ???
+
+    override def isTupleType: Boolean = ???
+
+    override def getArity: Int = ???
+
+    override def getTotalFields: Int = ???
+
+    override def getTypeClass: Class[MyOption[_]] = ???
+
+    override def isKeyType: Boolean = ???
+
+    override def createSerializer(config: ExecutionConfig): 
TypeSerializer[MyOption[_]] = ???
+
+    override def canEqual(obj: scala.Any): Boolean = ???
+
+    override def hashCode(): Int = ???
+
+    override def toString: String = ???
+
+    override def equals(obj: scala.Any): Boolean = ???
+  }
+}

Reply via email to