StephanEwen commented on a change in pull request #11666:
URL: https://github.com/apache/flink/pull/11666#discussion_r416661148
##########
File path:
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
##########
@@ -2132,4 +1940,321 @@ static void validateIfWritable(TypeInformation<?>
typeInfo, Type type) {
// ignore
}
}
+
+ /**
+ * Build the parameterized type hierarchy from subClass to baseClass.
+ * @param subClass the begin class of the type hierarchy
+ * @param baseClass the end class of the type hierarchy
+ * @return the type hierarchy.
+ */
+ private static List<Type> buildTypeHierarchy(final Class<?> subClass,
final Class<?> baseClass) {
+
+ final List<Type> typeHierarchy = new ArrayList<>();
+
+ if (baseClass.equals(subClass) ||
!baseClass.isAssignableFrom(subClass)) {
+ return Collections.emptyList();
+ }
+
+ final Type[] interfaceTypes = subClass.getGenericInterfaces();
+
+ for (Type type : interfaceTypes) {
+ if (baseClass.isAssignableFrom(typeToClass(type))) {
+ final List<Type> subTypeHierarchy =
buildTypeHierarchy(typeToClass(type), baseClass);
+ if (type instanceof ParameterizedType) {
+ typeHierarchy.add(type);
+ }
+ typeHierarchy.addAll(subTypeHierarchy);
+ return typeHierarchy;
+ }
+ }
+
+ if (baseClass.isAssignableFrom(subClass)) {
+ final Type type = subClass.getGenericSuperclass();
+ final List<Type> subTypeHierarchy =
buildTypeHierarchy(typeToClass(type), baseClass);
+ if (type instanceof ParameterizedType) {
+ typeHierarchy.add(type);
+ }
+ typeHierarchy.addAll(subTypeHierarchy);
+
+ return typeHierarchy;
+ }
+ return typeHierarchy;
+ }
+
+ /**
+ * Resolve all {@link TypeVariable}s of the type from the type
hierarchy.
+ * @param type the type needed to be resovled
+ * @param typeHierarchy the set of types which the {@link TypeVariable}
could be resovled from.
+ * @param resolveGenericArray whether to resolve the {@code
GenericArrayType} or not. This is for compatible.
+ * (Some code path resolves the component
type of a GenericArrayType. Some code path
+ * does not resolve the component type of
a GenericArray. A example case is
+ * {@link
TypeExtractorTest#testParameterizedArrays()})
+ * @return resolved type
+ */
+ private static Type resolveTypeFromTypeHierachy(final Type type, final
List<Type> typeHierarchy, final boolean resolveGenericArray) {
+ Type resolvedType = type;
+
+ if (type instanceof TypeVariable) {
+ resolvedType = materializeTypeVariable(typeHierarchy,
(TypeVariable) type);
+ }
+
+ if (resolvedType instanceof ParameterizedType) {
+ return resolveParameterizedType((ParameterizedType)
resolvedType, typeHierarchy, resolveGenericArray);
+ } else if (resolveGenericArray && resolvedType instanceof
GenericArrayType) {
+ return resolveGenericArrayType((GenericArrayType)
resolvedType, typeHierarchy);
+ }
+
+ return resolvedType;
+ }
+
+ /**
+ * Resolve all {@link TypeVariable}s of a {@link ParameterizedType}.
+ * @param parameterizedType the {@link ParameterizedType} needed to be
resovled.
+ * @param typeHierarchy the set of types which the {@link
TypeVariable}s could be resolved from.
+ * @param resolveGenericArray whether to resolve the {@code
GenericArrayType} or not. This is for compatible.
+ * @return resolved {@link ParameterizedType}
+ */
+ private static Type resolveParameterizedType(final ParameterizedType
parameterizedType, final List<Type> typeHierarchy, final boolean
resolveGenericArray) {
+
+ final Type[] actualTypeArguments = new
Type[parameterizedType.getActualTypeArguments().length];
+
+ int i = 0;
+ for (Type type : parameterizedType.getActualTypeArguments()) {
+ actualTypeArguments[i] =
resolveTypeFromTypeHierachy(type, typeHierarchy, resolveGenericArray);
+ i++;
+ }
+
+ return new
ResolvedParameterizedType(parameterizedType.getRawType(),
+ parameterizedType.getOwnerType(),
+ actualTypeArguments,
+ parameterizedType.getTypeName());
+ }
+
+ /**
+ * Resolve the component type of {@link GenericArrayType}.
+ * @param genericArrayType the {@link GenericArrayType} needed to be
resolved.
+ * @param typeHierarchy the set of types which the {@link
TypeVariable}s could be resolved from.
+ * @return resolved {@link GenericArrayType}
+ */
+ private static Type resolveGenericArrayType(final GenericArrayType
genericArrayType, final List<Type> typeHierarchy) {
+
+ final Type resolvedComponentType =
resolveTypeFromTypeHierachy(genericArrayType.getGenericComponentType(),
typeHierarchy, true);
+
+ return new
ResolvedGenericArrayType(genericArrayType.getTypeName(), resolvedComponentType);
+ }
+
+ private static class ResolvedGenericArrayType implements
GenericArrayType {
+
+ private final Type componentType;
+
+ private final String typeName;
+
+ public ResolvedGenericArrayType(String typeName, Type
componentType) {
+ this.componentType = componentType;
+ this.typeName = typeName;
+ }
+
+ @Override
+ public Type getGenericComponentType() {
+ return componentType;
+ }
+
+ public String getTypeName() {
+ return typeName;
+ }
+ }
+
+ private static class ResolvedParameterizedType implements
ParameterizedType {
+
+ private final Type rawType;
+
+ private final Type ownerType;
+
+ private final Type[] actualTypeArguments;
+
+ private final String typeName;
+
+ public ResolvedParameterizedType(Type rawType, Type ownerType,
Type[] actualTypeArguments, String typeName) {
+ this.rawType = rawType;
+ this.ownerType = ownerType;
+ this.actualTypeArguments = actualTypeArguments;
+ this.typeName = typeName;
+ }
+
+ @Override
+ public Type[] getActualTypeArguments() {
+ return actualTypeArguments;
+ }
+
+ @Override
+ public Type getRawType() {
+ return rawType;
+ }
+
+ @Override
+ public Type getOwnerType() {
+ return ownerType;
+ }
+
+ public String getTypeName() {
+ return typeName;
+ }
+ }
+
+ /**
+ * Bind the {@link TypeVariable} with {@link TypeInformation} from the
inputs' {@link TypeInformation}.
+ * @param clazz the sub class
+ * @param baseClazz the base class
+ * @param in1TypeInfo the {@link TypeInformation} of the first input
+ * @param in1Pos the position of type parameter of the first input in a
{@link Function} sub class
+ * @param in2TypeInfo the {@link TypeInformation} of the second input
+ * @param in2Pos the position of type parameter of the second input in
a {@link Function} sub class
+ * @param <IN1> the type of the first input
+ * @param <IN2> the type of the second input
+ * @return the mapping relation between {@link TypeVariable} and {@link
TypeInformation}
+ */
+ private static <IN1, IN2> Map<TypeVariable<?>, TypeInformation<?>>
bindTypeVariablesWithTypeInformationFromInputs(
+ final Class<?> clazz,
+ final Class<?> baseClazz,
+ final TypeInformation<IN1> in1TypeInfo,
+ final int in1Pos,
+ final TypeInformation<IN2> in2TypeInfo,
+ final int in2Pos) {
+
+ Map<TypeVariable<?>, TypeInformation<?>> typeVariableBindings =
Collections.emptyMap();
+
+ if (in1TypeInfo == null && in2TypeInfo == null) {
+ return typeVariableBindings;
+ }
+
+ final List<Type> functionTypeHierarchy =
buildTypeHierarchy(clazz, baseClazz);
+
+ if (functionTypeHierarchy.size() < 1) {
+ return typeVariableBindings;
+ }
+
+ final ParameterizedType baseClass = (ParameterizedType)
functionTypeHierarchy.get(functionTypeHierarchy.size() - 1);
Review comment:
This cast seems strange. Is the list always containing
`ParameterizedType`? If yes, it should by typed to that. If not, will this
simply crash in those cases?
##########
File path:
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
##########
@@ -2132,4 +1940,321 @@ static void validateIfWritable(TypeInformation<?>
typeInfo, Type type) {
// ignore
}
}
+
+ /**
+ * Build the parameterized type hierarchy from subClass to baseClass.
+ * @param subClass the begin class of the type hierarchy
+ * @param baseClass the end class of the type hierarchy
+ * @return the type hierarchy.
+ */
+ private static List<Type> buildTypeHierarchy(final Class<?> subClass,
final Class<?> baseClass) {
+
+ final List<Type> typeHierarchy = new ArrayList<>();
+
+ if (baseClass.equals(subClass) ||
!baseClass.isAssignableFrom(subClass)) {
+ return Collections.emptyList();
+ }
+
+ final Type[] interfaceTypes = subClass.getGenericInterfaces();
+
+ for (Type type : interfaceTypes) {
+ if (baseClass.isAssignableFrom(typeToClass(type))) {
+ final List<Type> subTypeHierarchy =
buildTypeHierarchy(typeToClass(type), baseClass);
+ if (type instanceof ParameterizedType) {
+ typeHierarchy.add(type);
+ }
+ typeHierarchy.addAll(subTypeHierarchy);
+ return typeHierarchy;
+ }
+ }
+
+ if (baseClass.isAssignableFrom(subClass)) {
+ final Type type = subClass.getGenericSuperclass();
+ final List<Type> subTypeHierarchy =
buildTypeHierarchy(typeToClass(type), baseClass);
+ if (type instanceof ParameterizedType) {
+ typeHierarchy.add(type);
+ }
+ typeHierarchy.addAll(subTypeHierarchy);
+
+ return typeHierarchy;
+ }
+ return typeHierarchy;
+ }
+
+ /**
+ * Resolve all {@link TypeVariable}s of the type from the type
hierarchy.
+ * @param type the type needed to be resovled
+ * @param typeHierarchy the set of types which the {@link TypeVariable}
could be resovled from.
+ * @param resolveGenericArray whether to resolve the {@code
GenericArrayType} or not. This is for compatible.
+ * (Some code path resolves the component
type of a GenericArrayType. Some code path
+ * does not resolve the component type of
a GenericArray. A example case is
+ * {@link
TypeExtractorTest#testParameterizedArrays()})
+ * @return resolved type
+ */
+ private static Type resolveTypeFromTypeHierachy(final Type type, final
List<Type> typeHierarchy, final boolean resolveGenericArray) {
+ Type resolvedType = type;
+
+ if (type instanceof TypeVariable) {
+ resolvedType = materializeTypeVariable(typeHierarchy,
(TypeVariable) type);
+ }
+
+ if (resolvedType instanceof ParameterizedType) {
+ return resolveParameterizedType((ParameterizedType)
resolvedType, typeHierarchy, resolveGenericArray);
+ } else if (resolveGenericArray && resolvedType instanceof
GenericArrayType) {
+ return resolveGenericArrayType((GenericArrayType)
resolvedType, typeHierarchy);
+ }
+
+ return resolvedType;
+ }
+
+ /**
+ * Resolve all {@link TypeVariable}s of a {@link ParameterizedType}.
+ * @param parameterizedType the {@link ParameterizedType} needed to be
resovled.
+ * @param typeHierarchy the set of types which the {@link
TypeVariable}s could be resolved from.
+ * @param resolveGenericArray whether to resolve the {@code
GenericArrayType} or not. This is for compatible.
+ * @return resolved {@link ParameterizedType}
+ */
+ private static Type resolveParameterizedType(final ParameterizedType
parameterizedType, final List<Type> typeHierarchy, final boolean
resolveGenericArray) {
+
+ final Type[] actualTypeArguments = new
Type[parameterizedType.getActualTypeArguments().length];
+
+ int i = 0;
+ for (Type type : parameterizedType.getActualTypeArguments()) {
+ actualTypeArguments[i] =
resolveTypeFromTypeHierachy(type, typeHierarchy, resolveGenericArray);
+ i++;
+ }
+
+ return new
ResolvedParameterizedType(parameterizedType.getRawType(),
+ parameterizedType.getOwnerType(),
+ actualTypeArguments,
+ parameterizedType.getTypeName());
+ }
+
+ /**
+ * Resolve the component type of {@link GenericArrayType}.
+ * @param genericArrayType the {@link GenericArrayType} needed to be
resolved.
+ * @param typeHierarchy the set of types which the {@link
TypeVariable}s could be resolved from.
+ * @return resolved {@link GenericArrayType}
+ */
+ private static Type resolveGenericArrayType(final GenericArrayType
genericArrayType, final List<Type> typeHierarchy) {
+
+ final Type resolvedComponentType =
resolveTypeFromTypeHierachy(genericArrayType.getGenericComponentType(),
typeHierarchy, true);
+
+ return new
ResolvedGenericArrayType(genericArrayType.getTypeName(), resolvedComponentType);
+ }
+
+ private static class ResolvedGenericArrayType implements
GenericArrayType {
+
+ private final Type componentType;
+
+ private final String typeName;
+
+ public ResolvedGenericArrayType(String typeName, Type
componentType) {
+ this.componentType = componentType;
+ this.typeName = typeName;
+ }
+
+ @Override
+ public Type getGenericComponentType() {
+ return componentType;
+ }
+
+ public String getTypeName() {
+ return typeName;
+ }
+ }
+
+ private static class ResolvedParameterizedType implements
ParameterizedType {
+
+ private final Type rawType;
+
+ private final Type ownerType;
+
+ private final Type[] actualTypeArguments;
+
+ private final String typeName;
+
+ public ResolvedParameterizedType(Type rawType, Type ownerType,
Type[] actualTypeArguments, String typeName) {
+ this.rawType = rawType;
+ this.ownerType = ownerType;
+ this.actualTypeArguments = actualTypeArguments;
+ this.typeName = typeName;
+ }
+
+ @Override
+ public Type[] getActualTypeArguments() {
+ return actualTypeArguments;
+ }
+
+ @Override
+ public Type getRawType() {
+ return rawType;
+ }
+
+ @Override
+ public Type getOwnerType() {
+ return ownerType;
+ }
+
+ public String getTypeName() {
+ return typeName;
+ }
+ }
+
+ /**
+ * Bind the {@link TypeVariable} with {@link TypeInformation} from the
inputs' {@link TypeInformation}.
+ * @param clazz the sub class
+ * @param baseClazz the base class
+ * @param in1TypeInfo the {@link TypeInformation} of the first input
+ * @param in1Pos the position of type parameter of the first input in a
{@link Function} sub class
+ * @param in2TypeInfo the {@link TypeInformation} of the second input
+ * @param in2Pos the position of type parameter of the second input in
a {@link Function} sub class
+ * @param <IN1> the type of the first input
+ * @param <IN2> the type of the second input
+ * @return the mapping relation between {@link TypeVariable} and {@link
TypeInformation}
+ */
+ private static <IN1, IN2> Map<TypeVariable<?>, TypeInformation<?>>
bindTypeVariablesWithTypeInformationFromInputs(
+ final Class<?> clazz,
+ final Class<?> baseClazz,
+ final TypeInformation<IN1> in1TypeInfo,
+ final int in1Pos,
+ final TypeInformation<IN2> in2TypeInfo,
+ final int in2Pos) {
+
+ Map<TypeVariable<?>, TypeInformation<?>> typeVariableBindings =
Collections.emptyMap();
+
+ if (in1TypeInfo == null && in2TypeInfo == null) {
+ return typeVariableBindings;
+ }
+
+ final List<Type> functionTypeHierarchy =
buildTypeHierarchy(clazz, baseClazz);
+
+ if (functionTypeHierarchy.size() < 1) {
+ return typeVariableBindings;
+ }
+
+ final ParameterizedType baseClass = (ParameterizedType)
functionTypeHierarchy.get(functionTypeHierarchy.size() - 1);
+
+ if (in1TypeInfo != null) {
+ final Type in1Type =
baseClass.getActualTypeArguments()[in1Pos];
+ final Type resolvedIn1Type =
resolveTypeFromTypeHierachy(in1Type, functionTypeHierarchy, false);
+ typeVariableBindings =
bindTypeVariablesWithTypeInformationFromInput(resolvedIn1Type, in1TypeInfo);
+ }
+
+ if (in2TypeInfo != null) {
+ final Type in2Type =
baseClass.getActualTypeArguments()[in2Pos];
+ final Type resolvedIn2Type =
resolveTypeFromTypeHierachy(in2Type, functionTypeHierarchy, false);
+ if
(!typeVariableBindings.equals(Collections.emptyMap())) {
Review comment:
See previous comment.
##########
File path:
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
##########
@@ -2132,4 +1940,321 @@ static void validateIfWritable(TypeInformation<?>
typeInfo, Type type) {
// ignore
}
}
+
+ /**
+ * Build the parameterized type hierarchy from subClass to baseClass.
+ * @param subClass the begin class of the type hierarchy
+ * @param baseClass the end class of the type hierarchy
+ * @return the type hierarchy.
+ */
+ private static List<Type> buildTypeHierarchy(final Class<?> subClass,
final Class<?> baseClass) {
+
+ final List<Type> typeHierarchy = new ArrayList<>();
+
+ if (baseClass.equals(subClass) ||
!baseClass.isAssignableFrom(subClass)) {
+ return Collections.emptyList();
+ }
+
+ final Type[] interfaceTypes = subClass.getGenericInterfaces();
+
+ for (Type type : interfaceTypes) {
+ if (baseClass.isAssignableFrom(typeToClass(type))) {
+ final List<Type> subTypeHierarchy =
buildTypeHierarchy(typeToClass(type), baseClass);
+ if (type instanceof ParameterizedType) {
+ typeHierarchy.add(type);
+ }
+ typeHierarchy.addAll(subTypeHierarchy);
+ return typeHierarchy;
+ }
+ }
+
+ if (baseClass.isAssignableFrom(subClass)) {
+ final Type type = subClass.getGenericSuperclass();
+ final List<Type> subTypeHierarchy =
buildTypeHierarchy(typeToClass(type), baseClass);
+ if (type instanceof ParameterizedType) {
+ typeHierarchy.add(type);
+ }
+ typeHierarchy.addAll(subTypeHierarchy);
+
+ return typeHierarchy;
+ }
+ return typeHierarchy;
+ }
+
+ /**
+ * Resolve all {@link TypeVariable}s of the type from the type
hierarchy.
+ * @param type the type needed to be resovled
+ * @param typeHierarchy the set of types which the {@link TypeVariable}
could be resovled from.
+ * @param resolveGenericArray whether to resolve the {@code
GenericArrayType} or not. This is for compatible.
+ * (Some code path resolves the component
type of a GenericArrayType. Some code path
+ * does not resolve the component type of
a GenericArray. A example case is
+ * {@link
TypeExtractorTest#testParameterizedArrays()})
+ * @return resolved type
+ */
+ private static Type resolveTypeFromTypeHierachy(final Type type, final
List<Type> typeHierarchy, final boolean resolveGenericArray) {
+ Type resolvedType = type;
+
+ if (type instanceof TypeVariable) {
+ resolvedType = materializeTypeVariable(typeHierarchy,
(TypeVariable) type);
+ }
+
+ if (resolvedType instanceof ParameterizedType) {
+ return resolveParameterizedType((ParameterizedType)
resolvedType, typeHierarchy, resolveGenericArray);
+ } else if (resolveGenericArray && resolvedType instanceof
GenericArrayType) {
+ return resolveGenericArrayType((GenericArrayType)
resolvedType, typeHierarchy);
+ }
+
+ return resolvedType;
+ }
+
+ /**
+ * Resolve all {@link TypeVariable}s of a {@link ParameterizedType}.
+ * @param parameterizedType the {@link ParameterizedType} needed to be
resovled.
+ * @param typeHierarchy the set of types which the {@link
TypeVariable}s could be resolved from.
+ * @param resolveGenericArray whether to resolve the {@code
GenericArrayType} or not. This is for compatible.
+ * @return resolved {@link ParameterizedType}
+ */
+ private static Type resolveParameterizedType(final ParameterizedType
parameterizedType, final List<Type> typeHierarchy, final boolean
resolveGenericArray) {
+
+ final Type[] actualTypeArguments = new
Type[parameterizedType.getActualTypeArguments().length];
+
+ int i = 0;
+ for (Type type : parameterizedType.getActualTypeArguments()) {
+ actualTypeArguments[i] =
resolveTypeFromTypeHierachy(type, typeHierarchy, resolveGenericArray);
+ i++;
+ }
+
+ return new
ResolvedParameterizedType(parameterizedType.getRawType(),
+ parameterizedType.getOwnerType(),
+ actualTypeArguments,
+ parameterizedType.getTypeName());
+ }
+
+ /**
+ * Resolve the component type of {@link GenericArrayType}.
+ * @param genericArrayType the {@link GenericArrayType} needed to be
resolved.
+ * @param typeHierarchy the set of types which the {@link
TypeVariable}s could be resolved from.
+ * @return resolved {@link GenericArrayType}
+ */
+ private static Type resolveGenericArrayType(final GenericArrayType
genericArrayType, final List<Type> typeHierarchy) {
+
+ final Type resolvedComponentType =
resolveTypeFromTypeHierachy(genericArrayType.getGenericComponentType(),
typeHierarchy, true);
+
+ return new
ResolvedGenericArrayType(genericArrayType.getTypeName(), resolvedComponentType);
+ }
+
+ private static class ResolvedGenericArrayType implements
GenericArrayType {
+
+ private final Type componentType;
+
+ private final String typeName;
+
+ public ResolvedGenericArrayType(String typeName, Type
componentType) {
+ this.componentType = componentType;
+ this.typeName = typeName;
+ }
+
+ @Override
+ public Type getGenericComponentType() {
+ return componentType;
+ }
+
+ public String getTypeName() {
+ return typeName;
+ }
+ }
+
+ private static class ResolvedParameterizedType implements
ParameterizedType {
+
+ private final Type rawType;
+
+ private final Type ownerType;
+
+ private final Type[] actualTypeArguments;
+
+ private final String typeName;
+
+ public ResolvedParameterizedType(Type rawType, Type ownerType,
Type[] actualTypeArguments, String typeName) {
+ this.rawType = rawType;
+ this.ownerType = ownerType;
+ this.actualTypeArguments = actualTypeArguments;
+ this.typeName = typeName;
+ }
+
+ @Override
+ public Type[] getActualTypeArguments() {
+ return actualTypeArguments;
+ }
+
+ @Override
+ public Type getRawType() {
+ return rawType;
+ }
+
+ @Override
+ public Type getOwnerType() {
+ return ownerType;
+ }
+
+ public String getTypeName() {
+ return typeName;
+ }
+ }
+
+ /**
+ * Bind the {@link TypeVariable} with {@link TypeInformation} from the
inputs' {@link TypeInformation}.
+ * @param clazz the sub class
+ * @param baseClazz the base class
+ * @param in1TypeInfo the {@link TypeInformation} of the first input
+ * @param in1Pos the position of type parameter of the first input in a
{@link Function} sub class
+ * @param in2TypeInfo the {@link TypeInformation} of the second input
+ * @param in2Pos the position of type parameter of the second input in
a {@link Function} sub class
+ * @param <IN1> the type of the first input
+ * @param <IN2> the type of the second input
+ * @return the mapping relation between {@link TypeVariable} and {@link
TypeInformation}
+ */
+ private static <IN1, IN2> Map<TypeVariable<?>, TypeInformation<?>>
bindTypeVariablesWithTypeInformationFromInputs(
+ final Class<?> clazz,
+ final Class<?> baseClazz,
+ final TypeInformation<IN1> in1TypeInfo,
+ final int in1Pos,
+ final TypeInformation<IN2> in2TypeInfo,
+ final int in2Pos) {
+
+ Map<TypeVariable<?>, TypeInformation<?>> typeVariableBindings =
Collections.emptyMap();
Review comment:
The use of the `typeVariableBindings` is a bit confusing. I would
suggest to rework this to avoid checks against what instance is assigned to the
variable.
##########
File path:
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
##########
@@ -2132,4 +1940,321 @@ static void validateIfWritable(TypeInformation<?>
typeInfo, Type type) {
// ignore
}
}
+
+ /**
+ * Build the parameterized type hierarchy from subClass to baseClass.
+ * @param subClass the begin class of the type hierarchy
+ * @param baseClass the end class of the type hierarchy
+ * @return the type hierarchy.
+ */
+ private static List<Type> buildTypeHierarchy(final Class<?> subClass,
final Class<?> baseClass) {
+
+ final List<Type> typeHierarchy = new ArrayList<>();
+
+ if (baseClass.equals(subClass) ||
!baseClass.isAssignableFrom(subClass)) {
+ return Collections.emptyList();
+ }
+
+ final Type[] interfaceTypes = subClass.getGenericInterfaces();
+
+ for (Type type : interfaceTypes) {
+ if (baseClass.isAssignableFrom(typeToClass(type))) {
+ final List<Type> subTypeHierarchy =
buildTypeHierarchy(typeToClass(type), baseClass);
+ if (type instanceof ParameterizedType) {
+ typeHierarchy.add(type);
+ }
+ typeHierarchy.addAll(subTypeHierarchy);
+ return typeHierarchy;
+ }
+ }
+
+ if (baseClass.isAssignableFrom(subClass)) {
+ final Type type = subClass.getGenericSuperclass();
+ final List<Type> subTypeHierarchy =
buildTypeHierarchy(typeToClass(type), baseClass);
+ if (type instanceof ParameterizedType) {
+ typeHierarchy.add(type);
+ }
+ typeHierarchy.addAll(subTypeHierarchy);
+
+ return typeHierarchy;
+ }
+ return typeHierarchy;
+ }
+
+ /**
+ * Resolve all {@link TypeVariable}s of the type from the type
hierarchy.
+ * @param type the type needed to be resovled
+ * @param typeHierarchy the set of types which the {@link TypeVariable}
could be resovled from.
+ * @param resolveGenericArray whether to resolve the {@code
GenericArrayType} or not. This is for compatible.
+ * (Some code path resolves the component
type of a GenericArrayType. Some code path
+ * does not resolve the component type of
a GenericArray. A example case is
+ * {@link
TypeExtractorTest#testParameterizedArrays()})
+ * @return resolved type
+ */
+ private static Type resolveTypeFromTypeHierachy(final Type type, final
List<Type> typeHierarchy, final boolean resolveGenericArray) {
+ Type resolvedType = type;
+
+ if (type instanceof TypeVariable) {
+ resolvedType = materializeTypeVariable(typeHierarchy,
(TypeVariable) type);
+ }
+
+ if (resolvedType instanceof ParameterizedType) {
+ return resolveParameterizedType((ParameterizedType)
resolvedType, typeHierarchy, resolveGenericArray);
+ } else if (resolveGenericArray && resolvedType instanceof
GenericArrayType) {
+ return resolveGenericArrayType((GenericArrayType)
resolvedType, typeHierarchy);
+ }
+
+ return resolvedType;
+ }
+
+ /**
+ * Resolve all {@link TypeVariable}s of a {@link ParameterizedType}.
+ * @param parameterizedType the {@link ParameterizedType} needed to be
resovled.
+ * @param typeHierarchy the set of types which the {@link
TypeVariable}s could be resolved from.
+ * @param resolveGenericArray whether to resolve the {@code
GenericArrayType} or not. This is for compatible.
+ * @return resolved {@link ParameterizedType}
+ */
+ private static Type resolveParameterizedType(final ParameterizedType
parameterizedType, final List<Type> typeHierarchy, final boolean
resolveGenericArray) {
+
+ final Type[] actualTypeArguments = new
Type[parameterizedType.getActualTypeArguments().length];
+
+ int i = 0;
+ for (Type type : parameterizedType.getActualTypeArguments()) {
+ actualTypeArguments[i] =
resolveTypeFromTypeHierachy(type, typeHierarchy, resolveGenericArray);
+ i++;
+ }
+
+ return new
ResolvedParameterizedType(parameterizedType.getRawType(),
+ parameterizedType.getOwnerType(),
+ actualTypeArguments,
+ parameterizedType.getTypeName());
+ }
+
+ /**
+ * Resolve the component type of {@link GenericArrayType}.
+ * @param genericArrayType the {@link GenericArrayType} needed to be
resolved.
+ * @param typeHierarchy the set of types which the {@link
TypeVariable}s could be resolved from.
+ * @return resolved {@link GenericArrayType}
+ */
+ private static Type resolveGenericArrayType(final GenericArrayType
genericArrayType, final List<Type> typeHierarchy) {
+
+ final Type resolvedComponentType =
resolveTypeFromTypeHierachy(genericArrayType.getGenericComponentType(),
typeHierarchy, true);
+
+ return new
ResolvedGenericArrayType(genericArrayType.getTypeName(), resolvedComponentType);
+ }
+
+ private static class ResolvedGenericArrayType implements
GenericArrayType {
+
+ private final Type componentType;
+
+ private final String typeName;
+
+ public ResolvedGenericArrayType(String typeName, Type
componentType) {
+ this.componentType = componentType;
+ this.typeName = typeName;
+ }
+
+ @Override
+ public Type getGenericComponentType() {
+ return componentType;
+ }
+
+ public String getTypeName() {
+ return typeName;
+ }
+ }
+
+ private static class ResolvedParameterizedType implements
ParameterizedType {
+
+ private final Type rawType;
+
+ private final Type ownerType;
+
+ private final Type[] actualTypeArguments;
+
+ private final String typeName;
+
+ public ResolvedParameterizedType(Type rawType, Type ownerType,
Type[] actualTypeArguments, String typeName) {
+ this.rawType = rawType;
+ this.ownerType = ownerType;
+ this.actualTypeArguments = actualTypeArguments;
+ this.typeName = typeName;
+ }
+
+ @Override
+ public Type[] getActualTypeArguments() {
+ return actualTypeArguments;
+ }
+
+ @Override
+ public Type getRawType() {
+ return rawType;
+ }
+
+ @Override
+ public Type getOwnerType() {
+ return ownerType;
+ }
+
+ public String getTypeName() {
+ return typeName;
+ }
+ }
+
+ /**
+ * Bind the {@link TypeVariable} with {@link TypeInformation} from the
inputs' {@link TypeInformation}.
+ * @param clazz the sub class
+ * @param baseClazz the base class
+ * @param in1TypeInfo the {@link TypeInformation} of the first input
+ * @param in1Pos the position of type parameter of the first input in a
{@link Function} sub class
+ * @param in2TypeInfo the {@link TypeInformation} of the second input
+ * @param in2Pos the position of type parameter of the second input in
a {@link Function} sub class
+ * @param <IN1> the type of the first input
+ * @param <IN2> the type of the second input
+ * @return the mapping relation between {@link TypeVariable} and {@link
TypeInformation}
+ */
+ private static <IN1, IN2> Map<TypeVariable<?>, TypeInformation<?>>
bindTypeVariablesWithTypeInformationFromInputs(
+ final Class<?> clazz,
+ final Class<?> baseClazz,
+ final TypeInformation<IN1> in1TypeInfo,
+ final int in1Pos,
+ final TypeInformation<IN2> in2TypeInfo,
+ final int in2Pos) {
+
+ Map<TypeVariable<?>, TypeInformation<?>> typeVariableBindings =
Collections.emptyMap();
+
+ if (in1TypeInfo == null && in2TypeInfo == null) {
+ return typeVariableBindings;
+ }
+
+ final List<Type> functionTypeHierarchy =
buildTypeHierarchy(clazz, baseClazz);
+
+ if (functionTypeHierarchy.size() < 1) {
+ return typeVariableBindings;
+ }
+
+ final ParameterizedType baseClass = (ParameterizedType)
functionTypeHierarchy.get(functionTypeHierarchy.size() - 1);
+
+ if (in1TypeInfo != null) {
+ final Type in1Type =
baseClass.getActualTypeArguments()[in1Pos];
+ final Type resolvedIn1Type =
resolveTypeFromTypeHierachy(in1Type, functionTypeHierarchy, false);
+ typeVariableBindings =
bindTypeVariablesWithTypeInformationFromInput(resolvedIn1Type, in1TypeInfo);
+ }
+
+ if (in2TypeInfo != null) {
+ final Type in2Type =
baseClass.getActualTypeArguments()[in2Pos];
+ final Type resolvedIn2Type =
resolveTypeFromTypeHierachy(in2Type, functionTypeHierarchy, false);
+ if
(!typeVariableBindings.equals(Collections.emptyMap())) {
+
typeVariableBindings.putAll(bindTypeVariablesWithTypeInformationFromInput(resolvedIn2Type,
in2TypeInfo));
+ }
+ }
+ return typeVariableBindings;
+ }
+
+ /**
+ * Bind the {@link TypeVariable} with {@link TypeInformation} from one
input's {@link TypeInformation}.
+ * @param inType the input type
+ * @param inTypeInfo the input's {@link TypeInformation}
+ * @return the mapping relation between {@link TypeVariable} and {@link
TypeInformation}
+ */
+ private static Map<TypeVariable<?>, TypeInformation<?>>
bindTypeVariablesWithTypeInformationFromInput(final Type inType, final
TypeInformation<?> inTypeInfo) {
+
+ final List<Type> factoryHierarchy = new
ArrayList<Type>(Arrays.asList(inType));
Review comment:
I don't fully understand the mechanics here. The list seems to be an
"out parameter" to return the hierarchy, but it is also populated with one
value initially that is then duplicated because it is also added within the
`getClosestFactory()` method.
##########
File path:
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
##########
@@ -98,9 +98,9 @@ private PojoTypeExtractor() {
}
@Override
- public <OUT, IN1, IN2> TypeInformation<OUT>
analyzePojo(Class<OUT> clazz, ArrayList<Type> typeHierarchy,
- ParameterizedType parameterizedType,
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
- return super.analyzePojo(clazz, typeHierarchy,
parameterizedType, in1Type, in2Type);
+ public <OUT> TypeInformation<OUT> analyzePojo(Class<OUT> clazz,
Review comment:
Maybe we can add a comment to `PojoTypeExtractor` that its only purpose
is to increase the visibility of the `analyzePojo(...)` method?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]