StephanEwen commented on a change in pull request #11666:
URL: https://github.com/apache/flink/pull/11666#discussion_r416661148



##########
File path: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
##########
@@ -2132,4 +1940,321 @@ static void validateIfWritable(TypeInformation<?> 
typeInfo, Type type) {
                        // ignore
                }
        }
+
+       /**
+        * Build the parameterized type hierarchy from subClass to baseClass.
+        * @param subClass the begin class of the type hierarchy
+        * @param baseClass the end class of the type hierarchy
+        * @return the type hierarchy.
+        */
+       private static List<Type> buildTypeHierarchy(final Class<?> subClass, 
final Class<?> baseClass) {
+
+               final List<Type> typeHierarchy = new ArrayList<>();
+
+               if (baseClass.equals(subClass) || 
!baseClass.isAssignableFrom(subClass)) {
+                       return Collections.emptyList();
+               }
+
+               final Type[] interfaceTypes = subClass.getGenericInterfaces();
+
+               for (Type type : interfaceTypes) {
+                       if (baseClass.isAssignableFrom(typeToClass(type))) {
+                               final List<Type> subTypeHierarchy = 
buildTypeHierarchy(typeToClass(type), baseClass);
+                               if (type instanceof ParameterizedType) {
+                                       typeHierarchy.add(type);
+                               }
+                               typeHierarchy.addAll(subTypeHierarchy);
+                               return typeHierarchy;
+                       }
+               }
+
+               if (baseClass.isAssignableFrom(subClass)) {
+                       final Type type = subClass.getGenericSuperclass();
+                       final List<Type> subTypeHierarchy = 
buildTypeHierarchy(typeToClass(type), baseClass);
+                       if (type instanceof ParameterizedType) {
+                               typeHierarchy.add(type);
+                       }
+                       typeHierarchy.addAll(subTypeHierarchy);
+
+                       return typeHierarchy;
+               }
+               return typeHierarchy;
+       }
+
+       /**
+        * Resolve all {@link TypeVariable}s of the type from the type 
hierarchy.
+        * @param type the type needed to be resovled
+        * @param typeHierarchy the set of types which the {@link TypeVariable} 
could be resovled from.
+        * @param resolveGenericArray whether to resolve the {@code 
GenericArrayType} or not. This is for compatible.
+        *                               (Some code path resolves the component 
type of a GenericArrayType. Some code path
+        *                               does not resolve the component type of 
a GenericArray. A example case is
+        *                               {@link 
TypeExtractorTest#testParameterizedArrays()})
+        * @return resolved type
+        */
+       private static Type resolveTypeFromTypeHierachy(final Type type, final 
List<Type> typeHierarchy, final boolean resolveGenericArray) {
+               Type resolvedType = type;
+
+               if (type instanceof TypeVariable) {
+                       resolvedType = materializeTypeVariable(typeHierarchy, 
(TypeVariable) type);
+               }
+
+               if (resolvedType instanceof ParameterizedType) {
+                       return resolveParameterizedType((ParameterizedType) 
resolvedType, typeHierarchy, resolveGenericArray);
+               } else if (resolveGenericArray && resolvedType instanceof 
GenericArrayType) {
+                       return resolveGenericArrayType((GenericArrayType) 
resolvedType, typeHierarchy);
+               }
+
+               return resolvedType;
+       }
+
+       /**
+        * Resolve all {@link TypeVariable}s of a {@link ParameterizedType}.
+        * @param parameterizedType the {@link ParameterizedType} needed to be 
resovled.
+        * @param typeHierarchy the set of types which the {@link 
TypeVariable}s could be resolved from.
+        * @param resolveGenericArray whether to resolve the {@code 
GenericArrayType} or not. This is for compatible.
+        * @return resolved {@link ParameterizedType}
+        */
+       private static Type resolveParameterizedType(final ParameterizedType 
parameterizedType, final List<Type> typeHierarchy, final boolean 
resolveGenericArray) {
+
+               final Type[] actualTypeArguments = new 
Type[parameterizedType.getActualTypeArguments().length];
+
+               int i = 0;
+               for (Type type : parameterizedType.getActualTypeArguments()) {
+                       actualTypeArguments[i] = 
resolveTypeFromTypeHierachy(type, typeHierarchy, resolveGenericArray);
+                       i++;
+               }
+
+               return new 
ResolvedParameterizedType(parameterizedType.getRawType(),
+                       parameterizedType.getOwnerType(),
+                       actualTypeArguments,
+                       parameterizedType.getTypeName());
+       }
+
+       /**
+        * Resolve the component type of {@link GenericArrayType}.
+        * @param genericArrayType the {@link GenericArrayType} needed to be 
resolved.
+        * @param typeHierarchy the set of types which the {@link 
TypeVariable}s could be resolved from.
+        * @return resolved {@link GenericArrayType}
+        */
+       private static Type resolveGenericArrayType(final GenericArrayType 
genericArrayType, final List<Type> typeHierarchy) {
+
+               final Type resolvedComponentType = 
resolveTypeFromTypeHierachy(genericArrayType.getGenericComponentType(), 
typeHierarchy, true);
+
+               return new 
ResolvedGenericArrayType(genericArrayType.getTypeName(), resolvedComponentType);
+       }
+
+       private static class ResolvedGenericArrayType implements 
GenericArrayType {
+
+               private final Type componentType;
+
+               private final String typeName;
+
+               public ResolvedGenericArrayType(String typeName, Type 
componentType) {
+                       this.componentType = componentType;
+                       this.typeName = typeName;
+               }
+
+               @Override
+               public Type getGenericComponentType() {
+                       return componentType;
+               }
+
+               public String getTypeName() {
+                       return typeName;
+               }
+       }
+
+       private static class ResolvedParameterizedType implements 
ParameterizedType {
+
+               private final Type rawType;
+
+               private final Type ownerType;
+
+               private final Type[] actualTypeArguments;
+
+               private final String typeName;
+
+               public ResolvedParameterizedType(Type rawType, Type ownerType, 
Type[] actualTypeArguments, String typeName) {
+                       this.rawType = rawType;
+                       this.ownerType = ownerType;
+                       this.actualTypeArguments = actualTypeArguments;
+                       this.typeName = typeName;
+               }
+
+               @Override
+               public Type[] getActualTypeArguments() {
+                       return actualTypeArguments;
+               }
+
+               @Override
+               public Type getRawType() {
+                       return rawType;
+               }
+
+               @Override
+               public Type getOwnerType() {
+                       return ownerType;
+               }
+
+               public String getTypeName() {
+                       return typeName;
+               }
+       }
+
+       /**
+        * Bind the {@link TypeVariable} with {@link TypeInformation} from the 
inputs' {@link TypeInformation}.
+        * @param clazz the sub class
+        * @param baseClazz the base class
+        * @param in1TypeInfo the {@link TypeInformation} of the first input
+        * @param in1Pos the position of type parameter of the first input in a 
{@link Function} sub class
+        * @param in2TypeInfo the {@link TypeInformation} of the second input
+        * @param in2Pos the position of type parameter of the second input in 
a {@link Function} sub class
+        * @param <IN1> the type of the first input
+        * @param <IN2> the type of the second input
+        * @return the mapping relation between {@link TypeVariable} and {@link 
TypeInformation}
+        */
+       private static <IN1, IN2> Map<TypeVariable<?>, TypeInformation<?>> 
bindTypeVariablesWithTypeInformationFromInputs(
+               final Class<?> clazz,
+               final Class<?> baseClazz,
+               final TypeInformation<IN1> in1TypeInfo,
+               final int in1Pos,
+               final TypeInformation<IN2> in2TypeInfo,
+               final int in2Pos) {
+
+               Map<TypeVariable<?>, TypeInformation<?>> typeVariableBindings = 
Collections.emptyMap();
+
+               if (in1TypeInfo == null && in2TypeInfo == null) {
+                       return typeVariableBindings;
+               }
+
+               final List<Type> functionTypeHierarchy = 
buildTypeHierarchy(clazz, baseClazz);
+
+               if (functionTypeHierarchy.size() < 1) {
+                       return typeVariableBindings;
+               }
+
+               final ParameterizedType baseClass = (ParameterizedType) 
functionTypeHierarchy.get(functionTypeHierarchy.size() - 1);

Review comment:
       This cast seems strange. Is the list always containing 
`ParameterizedType`? If yes, it should by typed to that. If not, will this 
simply crash in those cases?

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
##########
@@ -2132,4 +1940,321 @@ static void validateIfWritable(TypeInformation<?> 
typeInfo, Type type) {
                        // ignore
                }
        }
+
+       /**
+        * Build the parameterized type hierarchy from subClass to baseClass.
+        * @param subClass the begin class of the type hierarchy
+        * @param baseClass the end class of the type hierarchy
+        * @return the type hierarchy.
+        */
+       private static List<Type> buildTypeHierarchy(final Class<?> subClass, 
final Class<?> baseClass) {
+
+               final List<Type> typeHierarchy = new ArrayList<>();
+
+               if (baseClass.equals(subClass) || 
!baseClass.isAssignableFrom(subClass)) {
+                       return Collections.emptyList();
+               }
+
+               final Type[] interfaceTypes = subClass.getGenericInterfaces();
+
+               for (Type type : interfaceTypes) {
+                       if (baseClass.isAssignableFrom(typeToClass(type))) {
+                               final List<Type> subTypeHierarchy = 
buildTypeHierarchy(typeToClass(type), baseClass);
+                               if (type instanceof ParameterizedType) {
+                                       typeHierarchy.add(type);
+                               }
+                               typeHierarchy.addAll(subTypeHierarchy);
+                               return typeHierarchy;
+                       }
+               }
+
+               if (baseClass.isAssignableFrom(subClass)) {
+                       final Type type = subClass.getGenericSuperclass();
+                       final List<Type> subTypeHierarchy = 
buildTypeHierarchy(typeToClass(type), baseClass);
+                       if (type instanceof ParameterizedType) {
+                               typeHierarchy.add(type);
+                       }
+                       typeHierarchy.addAll(subTypeHierarchy);
+
+                       return typeHierarchy;
+               }
+               return typeHierarchy;
+       }
+
+       /**
+        * Resolve all {@link TypeVariable}s of the type from the type 
hierarchy.
+        * @param type the type needed to be resovled
+        * @param typeHierarchy the set of types which the {@link TypeVariable} 
could be resovled from.
+        * @param resolveGenericArray whether to resolve the {@code 
GenericArrayType} or not. This is for compatible.
+        *                               (Some code path resolves the component 
type of a GenericArrayType. Some code path
+        *                               does not resolve the component type of 
a GenericArray. A example case is
+        *                               {@link 
TypeExtractorTest#testParameterizedArrays()})
+        * @return resolved type
+        */
+       private static Type resolveTypeFromTypeHierachy(final Type type, final 
List<Type> typeHierarchy, final boolean resolveGenericArray) {
+               Type resolvedType = type;
+
+               if (type instanceof TypeVariable) {
+                       resolvedType = materializeTypeVariable(typeHierarchy, 
(TypeVariable) type);
+               }
+
+               if (resolvedType instanceof ParameterizedType) {
+                       return resolveParameterizedType((ParameterizedType) 
resolvedType, typeHierarchy, resolveGenericArray);
+               } else if (resolveGenericArray && resolvedType instanceof 
GenericArrayType) {
+                       return resolveGenericArrayType((GenericArrayType) 
resolvedType, typeHierarchy);
+               }
+
+               return resolvedType;
+       }
+
+       /**
+        * Resolve all {@link TypeVariable}s of a {@link ParameterizedType}.
+        * @param parameterizedType the {@link ParameterizedType} needed to be 
resovled.
+        * @param typeHierarchy the set of types which the {@link 
TypeVariable}s could be resolved from.
+        * @param resolveGenericArray whether to resolve the {@code 
GenericArrayType} or not. This is for compatible.
+        * @return resolved {@link ParameterizedType}
+        */
+       private static Type resolveParameterizedType(final ParameterizedType 
parameterizedType, final List<Type> typeHierarchy, final boolean 
resolveGenericArray) {
+
+               final Type[] actualTypeArguments = new 
Type[parameterizedType.getActualTypeArguments().length];
+
+               int i = 0;
+               for (Type type : parameterizedType.getActualTypeArguments()) {
+                       actualTypeArguments[i] = 
resolveTypeFromTypeHierachy(type, typeHierarchy, resolveGenericArray);
+                       i++;
+               }
+
+               return new 
ResolvedParameterizedType(parameterizedType.getRawType(),
+                       parameterizedType.getOwnerType(),
+                       actualTypeArguments,
+                       parameterizedType.getTypeName());
+       }
+
+       /**
+        * Resolve the component type of {@link GenericArrayType}.
+        * @param genericArrayType the {@link GenericArrayType} needed to be 
resolved.
+        * @param typeHierarchy the set of types which the {@link 
TypeVariable}s could be resolved from.
+        * @return resolved {@link GenericArrayType}
+        */
+       private static Type resolveGenericArrayType(final GenericArrayType 
genericArrayType, final List<Type> typeHierarchy) {
+
+               final Type resolvedComponentType = 
resolveTypeFromTypeHierachy(genericArrayType.getGenericComponentType(), 
typeHierarchy, true);
+
+               return new 
ResolvedGenericArrayType(genericArrayType.getTypeName(), resolvedComponentType);
+       }
+
+       private static class ResolvedGenericArrayType implements 
GenericArrayType {
+
+               private final Type componentType;
+
+               private final String typeName;
+
+               public ResolvedGenericArrayType(String typeName, Type 
componentType) {
+                       this.componentType = componentType;
+                       this.typeName = typeName;
+               }
+
+               @Override
+               public Type getGenericComponentType() {
+                       return componentType;
+               }
+
+               public String getTypeName() {
+                       return typeName;
+               }
+       }
+
+       private static class ResolvedParameterizedType implements 
ParameterizedType {
+
+               private final Type rawType;
+
+               private final Type ownerType;
+
+               private final Type[] actualTypeArguments;
+
+               private final String typeName;
+
+               public ResolvedParameterizedType(Type rawType, Type ownerType, 
Type[] actualTypeArguments, String typeName) {
+                       this.rawType = rawType;
+                       this.ownerType = ownerType;
+                       this.actualTypeArguments = actualTypeArguments;
+                       this.typeName = typeName;
+               }
+
+               @Override
+               public Type[] getActualTypeArguments() {
+                       return actualTypeArguments;
+               }
+
+               @Override
+               public Type getRawType() {
+                       return rawType;
+               }
+
+               @Override
+               public Type getOwnerType() {
+                       return ownerType;
+               }
+
+               public String getTypeName() {
+                       return typeName;
+               }
+       }
+
+       /**
+        * Bind the {@link TypeVariable} with {@link TypeInformation} from the 
inputs' {@link TypeInformation}.
+        * @param clazz the sub class
+        * @param baseClazz the base class
+        * @param in1TypeInfo the {@link TypeInformation} of the first input
+        * @param in1Pos the position of type parameter of the first input in a 
{@link Function} sub class
+        * @param in2TypeInfo the {@link TypeInformation} of the second input
+        * @param in2Pos the position of type parameter of the second input in 
a {@link Function} sub class
+        * @param <IN1> the type of the first input
+        * @param <IN2> the type of the second input
+        * @return the mapping relation between {@link TypeVariable} and {@link 
TypeInformation}
+        */
+       private static <IN1, IN2> Map<TypeVariable<?>, TypeInformation<?>> 
bindTypeVariablesWithTypeInformationFromInputs(
+               final Class<?> clazz,
+               final Class<?> baseClazz,
+               final TypeInformation<IN1> in1TypeInfo,
+               final int in1Pos,
+               final TypeInformation<IN2> in2TypeInfo,
+               final int in2Pos) {
+
+               Map<TypeVariable<?>, TypeInformation<?>> typeVariableBindings = 
Collections.emptyMap();
+
+               if (in1TypeInfo == null && in2TypeInfo == null) {
+                       return typeVariableBindings;
+               }
+
+               final List<Type> functionTypeHierarchy = 
buildTypeHierarchy(clazz, baseClazz);
+
+               if (functionTypeHierarchy.size() < 1) {
+                       return typeVariableBindings;
+               }
+
+               final ParameterizedType baseClass = (ParameterizedType) 
functionTypeHierarchy.get(functionTypeHierarchy.size() - 1);
+
+               if (in1TypeInfo != null) {
+                       final Type in1Type = 
baseClass.getActualTypeArguments()[in1Pos];
+                       final Type resolvedIn1Type = 
resolveTypeFromTypeHierachy(in1Type, functionTypeHierarchy, false);
+                       typeVariableBindings = 
bindTypeVariablesWithTypeInformationFromInput(resolvedIn1Type, in1TypeInfo);
+               }
+
+               if (in2TypeInfo != null) {
+                       final Type in2Type = 
baseClass.getActualTypeArguments()[in2Pos];
+                       final Type resolvedIn2Type = 
resolveTypeFromTypeHierachy(in2Type, functionTypeHierarchy, false);
+                       if 
(!typeVariableBindings.equals(Collections.emptyMap())) {

Review comment:
       See previous comment.

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
##########
@@ -2132,4 +1940,321 @@ static void validateIfWritable(TypeInformation<?> 
typeInfo, Type type) {
                        // ignore
                }
        }
+
+       /**
+        * Build the parameterized type hierarchy from subClass to baseClass.
+        * @param subClass the begin class of the type hierarchy
+        * @param baseClass the end class of the type hierarchy
+        * @return the type hierarchy.
+        */
+       private static List<Type> buildTypeHierarchy(final Class<?> subClass, 
final Class<?> baseClass) {
+
+               final List<Type> typeHierarchy = new ArrayList<>();
+
+               if (baseClass.equals(subClass) || 
!baseClass.isAssignableFrom(subClass)) {
+                       return Collections.emptyList();
+               }
+
+               final Type[] interfaceTypes = subClass.getGenericInterfaces();
+
+               for (Type type : interfaceTypes) {
+                       if (baseClass.isAssignableFrom(typeToClass(type))) {
+                               final List<Type> subTypeHierarchy = 
buildTypeHierarchy(typeToClass(type), baseClass);
+                               if (type instanceof ParameterizedType) {
+                                       typeHierarchy.add(type);
+                               }
+                               typeHierarchy.addAll(subTypeHierarchy);
+                               return typeHierarchy;
+                       }
+               }
+
+               if (baseClass.isAssignableFrom(subClass)) {
+                       final Type type = subClass.getGenericSuperclass();
+                       final List<Type> subTypeHierarchy = 
buildTypeHierarchy(typeToClass(type), baseClass);
+                       if (type instanceof ParameterizedType) {
+                               typeHierarchy.add(type);
+                       }
+                       typeHierarchy.addAll(subTypeHierarchy);
+
+                       return typeHierarchy;
+               }
+               return typeHierarchy;
+       }
+
+       /**
+        * Resolve all {@link TypeVariable}s of the type from the type 
hierarchy.
+        * @param type the type needed to be resovled
+        * @param typeHierarchy the set of types which the {@link TypeVariable} 
could be resovled from.
+        * @param resolveGenericArray whether to resolve the {@code 
GenericArrayType} or not. This is for compatible.
+        *                               (Some code path resolves the component 
type of a GenericArrayType. Some code path
+        *                               does not resolve the component type of 
a GenericArray. A example case is
+        *                               {@link 
TypeExtractorTest#testParameterizedArrays()})
+        * @return resolved type
+        */
+       private static Type resolveTypeFromTypeHierachy(final Type type, final 
List<Type> typeHierarchy, final boolean resolveGenericArray) {
+               Type resolvedType = type;
+
+               if (type instanceof TypeVariable) {
+                       resolvedType = materializeTypeVariable(typeHierarchy, 
(TypeVariable) type);
+               }
+
+               if (resolvedType instanceof ParameterizedType) {
+                       return resolveParameterizedType((ParameterizedType) 
resolvedType, typeHierarchy, resolveGenericArray);
+               } else if (resolveGenericArray && resolvedType instanceof 
GenericArrayType) {
+                       return resolveGenericArrayType((GenericArrayType) 
resolvedType, typeHierarchy);
+               }
+
+               return resolvedType;
+       }
+
+       /**
+        * Resolve all {@link TypeVariable}s of a {@link ParameterizedType}.
+        * @param parameterizedType the {@link ParameterizedType} needed to be 
resovled.
+        * @param typeHierarchy the set of types which the {@link 
TypeVariable}s could be resolved from.
+        * @param resolveGenericArray whether to resolve the {@code 
GenericArrayType} or not. This is for compatible.
+        * @return resolved {@link ParameterizedType}
+        */
+       private static Type resolveParameterizedType(final ParameterizedType 
parameterizedType, final List<Type> typeHierarchy, final boolean 
resolveGenericArray) {
+
+               final Type[] actualTypeArguments = new 
Type[parameterizedType.getActualTypeArguments().length];
+
+               int i = 0;
+               for (Type type : parameterizedType.getActualTypeArguments()) {
+                       actualTypeArguments[i] = 
resolveTypeFromTypeHierachy(type, typeHierarchy, resolveGenericArray);
+                       i++;
+               }
+
+               return new 
ResolvedParameterizedType(parameterizedType.getRawType(),
+                       parameterizedType.getOwnerType(),
+                       actualTypeArguments,
+                       parameterizedType.getTypeName());
+       }
+
+       /**
+        * Resolve the component type of {@link GenericArrayType}.
+        * @param genericArrayType the {@link GenericArrayType} needed to be 
resolved.
+        * @param typeHierarchy the set of types which the {@link 
TypeVariable}s could be resolved from.
+        * @return resolved {@link GenericArrayType}
+        */
+       private static Type resolveGenericArrayType(final GenericArrayType 
genericArrayType, final List<Type> typeHierarchy) {
+
+               final Type resolvedComponentType = 
resolveTypeFromTypeHierachy(genericArrayType.getGenericComponentType(), 
typeHierarchy, true);
+
+               return new 
ResolvedGenericArrayType(genericArrayType.getTypeName(), resolvedComponentType);
+       }
+
+       private static class ResolvedGenericArrayType implements 
GenericArrayType {
+
+               private final Type componentType;
+
+               private final String typeName;
+
+               public ResolvedGenericArrayType(String typeName, Type 
componentType) {
+                       this.componentType = componentType;
+                       this.typeName = typeName;
+               }
+
+               @Override
+               public Type getGenericComponentType() {
+                       return componentType;
+               }
+
+               public String getTypeName() {
+                       return typeName;
+               }
+       }
+
+       private static class ResolvedParameterizedType implements 
ParameterizedType {
+
+               private final Type rawType;
+
+               private final Type ownerType;
+
+               private final Type[] actualTypeArguments;
+
+               private final String typeName;
+
+               public ResolvedParameterizedType(Type rawType, Type ownerType, 
Type[] actualTypeArguments, String typeName) {
+                       this.rawType = rawType;
+                       this.ownerType = ownerType;
+                       this.actualTypeArguments = actualTypeArguments;
+                       this.typeName = typeName;
+               }
+
+               @Override
+               public Type[] getActualTypeArguments() {
+                       return actualTypeArguments;
+               }
+
+               @Override
+               public Type getRawType() {
+                       return rawType;
+               }
+
+               @Override
+               public Type getOwnerType() {
+                       return ownerType;
+               }
+
+               public String getTypeName() {
+                       return typeName;
+               }
+       }
+
+       /**
+        * Bind the {@link TypeVariable} with {@link TypeInformation} from the 
inputs' {@link TypeInformation}.
+        * @param clazz the sub class
+        * @param baseClazz the base class
+        * @param in1TypeInfo the {@link TypeInformation} of the first input
+        * @param in1Pos the position of type parameter of the first input in a 
{@link Function} sub class
+        * @param in2TypeInfo the {@link TypeInformation} of the second input
+        * @param in2Pos the position of type parameter of the second input in 
a {@link Function} sub class
+        * @param <IN1> the type of the first input
+        * @param <IN2> the type of the second input
+        * @return the mapping relation between {@link TypeVariable} and {@link 
TypeInformation}
+        */
+       private static <IN1, IN2> Map<TypeVariable<?>, TypeInformation<?>> 
bindTypeVariablesWithTypeInformationFromInputs(
+               final Class<?> clazz,
+               final Class<?> baseClazz,
+               final TypeInformation<IN1> in1TypeInfo,
+               final int in1Pos,
+               final TypeInformation<IN2> in2TypeInfo,
+               final int in2Pos) {
+
+               Map<TypeVariable<?>, TypeInformation<?>> typeVariableBindings = 
Collections.emptyMap();

Review comment:
       The use of the `typeVariableBindings` is a bit confusing. I would 
suggest to rework this to avoid checks against what instance is assigned to the 
variable.

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
##########
@@ -2132,4 +1940,321 @@ static void validateIfWritable(TypeInformation<?> 
typeInfo, Type type) {
                        // ignore
                }
        }
+
+       /**
+        * Build the parameterized type hierarchy from subClass to baseClass.
+        * @param subClass the begin class of the type hierarchy
+        * @param baseClass the end class of the type hierarchy
+        * @return the type hierarchy.
+        */
+       private static List<Type> buildTypeHierarchy(final Class<?> subClass, 
final Class<?> baseClass) {
+
+               final List<Type> typeHierarchy = new ArrayList<>();
+
+               if (baseClass.equals(subClass) || 
!baseClass.isAssignableFrom(subClass)) {
+                       return Collections.emptyList();
+               }
+
+               final Type[] interfaceTypes = subClass.getGenericInterfaces();
+
+               for (Type type : interfaceTypes) {
+                       if (baseClass.isAssignableFrom(typeToClass(type))) {
+                               final List<Type> subTypeHierarchy = 
buildTypeHierarchy(typeToClass(type), baseClass);
+                               if (type instanceof ParameterizedType) {
+                                       typeHierarchy.add(type);
+                               }
+                               typeHierarchy.addAll(subTypeHierarchy);
+                               return typeHierarchy;
+                       }
+               }
+
+               if (baseClass.isAssignableFrom(subClass)) {
+                       final Type type = subClass.getGenericSuperclass();
+                       final List<Type> subTypeHierarchy = 
buildTypeHierarchy(typeToClass(type), baseClass);
+                       if (type instanceof ParameterizedType) {
+                               typeHierarchy.add(type);
+                       }
+                       typeHierarchy.addAll(subTypeHierarchy);
+
+                       return typeHierarchy;
+               }
+               return typeHierarchy;
+       }
+
+       /**
+        * Resolve all {@link TypeVariable}s of the type from the type 
hierarchy.
+        * @param type the type needed to be resovled
+        * @param typeHierarchy the set of types which the {@link TypeVariable} 
could be resovled from.
+        * @param resolveGenericArray whether to resolve the {@code 
GenericArrayType} or not. This is for compatible.
+        *                               (Some code path resolves the component 
type of a GenericArrayType. Some code path
+        *                               does not resolve the component type of 
a GenericArray. A example case is
+        *                               {@link 
TypeExtractorTest#testParameterizedArrays()})
+        * @return resolved type
+        */
+       private static Type resolveTypeFromTypeHierachy(final Type type, final 
List<Type> typeHierarchy, final boolean resolveGenericArray) {
+               Type resolvedType = type;
+
+               if (type instanceof TypeVariable) {
+                       resolvedType = materializeTypeVariable(typeHierarchy, 
(TypeVariable) type);
+               }
+
+               if (resolvedType instanceof ParameterizedType) {
+                       return resolveParameterizedType((ParameterizedType) 
resolvedType, typeHierarchy, resolveGenericArray);
+               } else if (resolveGenericArray && resolvedType instanceof 
GenericArrayType) {
+                       return resolveGenericArrayType((GenericArrayType) 
resolvedType, typeHierarchy);
+               }
+
+               return resolvedType;
+       }
+
+       /**
+        * Resolve all {@link TypeVariable}s of a {@link ParameterizedType}.
+        * @param parameterizedType the {@link ParameterizedType} needed to be 
resovled.
+        * @param typeHierarchy the set of types which the {@link 
TypeVariable}s could be resolved from.
+        * @param resolveGenericArray whether to resolve the {@code 
GenericArrayType} or not. This is for compatible.
+        * @return resolved {@link ParameterizedType}
+        */
+       private static Type resolveParameterizedType(final ParameterizedType 
parameterizedType, final List<Type> typeHierarchy, final boolean 
resolveGenericArray) {
+
+               final Type[] actualTypeArguments = new 
Type[parameterizedType.getActualTypeArguments().length];
+
+               int i = 0;
+               for (Type type : parameterizedType.getActualTypeArguments()) {
+                       actualTypeArguments[i] = 
resolveTypeFromTypeHierachy(type, typeHierarchy, resolveGenericArray);
+                       i++;
+               }
+
+               return new 
ResolvedParameterizedType(parameterizedType.getRawType(),
+                       parameterizedType.getOwnerType(),
+                       actualTypeArguments,
+                       parameterizedType.getTypeName());
+       }
+
+       /**
+        * Resolve the component type of {@link GenericArrayType}.
+        * @param genericArrayType the {@link GenericArrayType} needed to be 
resolved.
+        * @param typeHierarchy the set of types which the {@link 
TypeVariable}s could be resolved from.
+        * @return resolved {@link GenericArrayType}
+        */
+       private static Type resolveGenericArrayType(final GenericArrayType 
genericArrayType, final List<Type> typeHierarchy) {
+
+               final Type resolvedComponentType = 
resolveTypeFromTypeHierachy(genericArrayType.getGenericComponentType(), 
typeHierarchy, true);
+
+               return new 
ResolvedGenericArrayType(genericArrayType.getTypeName(), resolvedComponentType);
+       }
+
+       private static class ResolvedGenericArrayType implements 
GenericArrayType {
+
+               private final Type componentType;
+
+               private final String typeName;
+
+               public ResolvedGenericArrayType(String typeName, Type 
componentType) {
+                       this.componentType = componentType;
+                       this.typeName = typeName;
+               }
+
+               @Override
+               public Type getGenericComponentType() {
+                       return componentType;
+               }
+
+               public String getTypeName() {
+                       return typeName;
+               }
+       }
+
+       private static class ResolvedParameterizedType implements 
ParameterizedType {
+
+               private final Type rawType;
+
+               private final Type ownerType;
+
+               private final Type[] actualTypeArguments;
+
+               private final String typeName;
+
+               public ResolvedParameterizedType(Type rawType, Type ownerType, 
Type[] actualTypeArguments, String typeName) {
+                       this.rawType = rawType;
+                       this.ownerType = ownerType;
+                       this.actualTypeArguments = actualTypeArguments;
+                       this.typeName = typeName;
+               }
+
+               @Override
+               public Type[] getActualTypeArguments() {
+                       return actualTypeArguments;
+               }
+
+               @Override
+               public Type getRawType() {
+                       return rawType;
+               }
+
+               @Override
+               public Type getOwnerType() {
+                       return ownerType;
+               }
+
+               public String getTypeName() {
+                       return typeName;
+               }
+       }
+
+       /**
+        * Bind the {@link TypeVariable} with {@link TypeInformation} from the 
inputs' {@link TypeInformation}.
+        * @param clazz the sub class
+        * @param baseClazz the base class
+        * @param in1TypeInfo the {@link TypeInformation} of the first input
+        * @param in1Pos the position of type parameter of the first input in a 
{@link Function} sub class
+        * @param in2TypeInfo the {@link TypeInformation} of the second input
+        * @param in2Pos the position of type parameter of the second input in 
a {@link Function} sub class
+        * @param <IN1> the type of the first input
+        * @param <IN2> the type of the second input
+        * @return the mapping relation between {@link TypeVariable} and {@link 
TypeInformation}
+        */
+       private static <IN1, IN2> Map<TypeVariable<?>, TypeInformation<?>> 
bindTypeVariablesWithTypeInformationFromInputs(
+               final Class<?> clazz,
+               final Class<?> baseClazz,
+               final TypeInformation<IN1> in1TypeInfo,
+               final int in1Pos,
+               final TypeInformation<IN2> in2TypeInfo,
+               final int in2Pos) {
+
+               Map<TypeVariable<?>, TypeInformation<?>> typeVariableBindings = 
Collections.emptyMap();
+
+               if (in1TypeInfo == null && in2TypeInfo == null) {
+                       return typeVariableBindings;
+               }
+
+               final List<Type> functionTypeHierarchy = 
buildTypeHierarchy(clazz, baseClazz);
+
+               if (functionTypeHierarchy.size() < 1) {
+                       return typeVariableBindings;
+               }
+
+               final ParameterizedType baseClass = (ParameterizedType) 
functionTypeHierarchy.get(functionTypeHierarchy.size() - 1);
+
+               if (in1TypeInfo != null) {
+                       final Type in1Type = 
baseClass.getActualTypeArguments()[in1Pos];
+                       final Type resolvedIn1Type = 
resolveTypeFromTypeHierachy(in1Type, functionTypeHierarchy, false);
+                       typeVariableBindings = 
bindTypeVariablesWithTypeInformationFromInput(resolvedIn1Type, in1TypeInfo);
+               }
+
+               if (in2TypeInfo != null) {
+                       final Type in2Type = 
baseClass.getActualTypeArguments()[in2Pos];
+                       final Type resolvedIn2Type = 
resolveTypeFromTypeHierachy(in2Type, functionTypeHierarchy, false);
+                       if 
(!typeVariableBindings.equals(Collections.emptyMap())) {
+                               
typeVariableBindings.putAll(bindTypeVariablesWithTypeInformationFromInput(resolvedIn2Type,
 in2TypeInfo));
+                       }
+               }
+               return typeVariableBindings;
+       }
+
+       /**
+        * Bind the {@link TypeVariable} with {@link TypeInformation} from one 
input's {@link TypeInformation}.
+        * @param inType the input type
+        * @param inTypeInfo the input's {@link TypeInformation}
+        * @return the mapping relation between {@link TypeVariable} and {@link 
TypeInformation}
+        */
+       private static Map<TypeVariable<?>, TypeInformation<?>> 
bindTypeVariablesWithTypeInformationFromInput(final Type inType, final 
TypeInformation<?> inTypeInfo) {
+
+               final List<Type> factoryHierarchy = new 
ArrayList<Type>(Arrays.asList(inType));

Review comment:
       I don't fully understand the mechanics here. The list seems to be an 
"out parameter" to return the hierarchy, but it is also populated with one 
value initially that is then duplicated because it is also added within the 
`getClosestFactory()` method.

##########
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
##########
@@ -98,9 +98,9 @@ private PojoTypeExtractor() {
                }
 
                @Override
-               public <OUT, IN1, IN2> TypeInformation<OUT> 
analyzePojo(Class<OUT> clazz, ArrayList<Type> typeHierarchy,
-                               ParameterizedType parameterizedType, 
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
-                       return super.analyzePojo(clazz, typeHierarchy, 
parameterizedType, in1Type, in2Type);
+               public <OUT> TypeInformation<OUT> analyzePojo(Class<OUT> clazz,

Review comment:
       Maybe we can add a comment to `PojoTypeExtractor` that its only purpose 
is to increase the visibility of the `analyzePojo(...)` method?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to