Repository: flink Updated Branches: refs/heads/master f11447e58 -> d4d7cc326
[FLINK-4673] [core] TypeInfoFactory for Either type Removes from TypeExtractor the explicit parsing for Either and adds an EitherTypeInfoFactory. This closes #2545. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d4d7cc32 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d4d7cc32 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d4d7cc32 Branch: refs/heads/master Commit: d4d7cc32667016d66c65a7d64601cabd101a0c4d Parents: f11447e Author: Greg Hogan <[email protected]> Authored: Fri Sep 23 14:59:36 2016 -0400 Committer: twalthr <[email protected]> Committed: Tue Jan 10 14:52:38 2017 +0100 ---------------------------------------------------------------------- .../api/java/typeutils/EitherTypeInfo.java | 20 +++- .../java/typeutils/EitherTypeInfoFactory.java | 48 ++++++++ .../flink/api/java/typeutils/TypeExtractor.java | 120 ++++--------------- .../java/org/apache/flink/types/Either.java | 3 + 4 files changed, 91 insertions(+), 100 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d4d7cc32/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java index 058de12..d4d1aa9 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java @@ -18,14 +18,19 @@ package org.apache.flink.api.java.typeutils; -import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.Public; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.typeutils.runtime.EitherSerializer; import org.apache.flink.types.Either; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * A {@link TypeInformation} for the {@link Either} type of the Java API. * @@ -43,8 +48,8 @@ public class EitherTypeInfo<L, R> extends TypeInformation<Either<L, R>> { @PublicEvolving public EitherTypeInfo(TypeInformation<L> leftType, TypeInformation<R> rightType) { - this.leftType = leftType; - this.rightType = rightType; + this.leftType = checkNotNull(leftType); + this.rightType = checkNotNull(rightType); } @Override @@ -80,6 +85,15 @@ public class EitherTypeInfo<L, R> extends TypeInformation<Either<L, R>> { @Override @PublicEvolving + public Map<String, TypeInformation<?>> getGenericParameters() { + Map<String, TypeInformation<?>> m = new HashMap<>(); + m.put("L", this.leftType); + m.put("R", this.rightType); + return m; + } + + @Override + @PublicEvolving public boolean isKeyType() { return false; } http://git-wip-us.apache.org/repos/asf/flink/blob/d4d7cc32/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfoFactory.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfoFactory.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfoFactory.java new file mode 100644 index 0000000..be881a7 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfoFactory.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.typeinfo.TypeInfoFactory; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.types.Either; + +import java.lang.reflect.Type; +import java.util.Map; + +public class EitherTypeInfoFactory<L, R> extends TypeInfoFactory<Either<L, R>> { + + @Override + public TypeInformation<Either<L, R>> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) { + TypeInformation<?> leftType = genericParameters.get("L"); + TypeInformation<?> rightType = genericParameters.get("R"); + + if (leftType == null) { + throw new InvalidTypesException("Type extraction is not possible on Either" + + " type as it does not contain information about the 'left' type."); + } + + if (rightType == null) { + throw new InvalidTypesException("Type extraction is not possible on Either" + + " type as it does not contain information about the 'right' type."); + } + + return new EitherTypeInfo(leftType, rightType); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d4d7cc32/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index 08f8c53..df4a0e0 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -18,28 +18,11 @@ package org.apache.flink.api.java.typeutils; -import java.lang.reflect.Constructor; -import java.lang.reflect.Field; -import java.lang.reflect.GenericArrayType; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.lang.reflect.Modifier; -import java.lang.reflect.ParameterizedType; -import java.lang.reflect.Type; -import java.lang.reflect.TypeVariable; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.HashMap; -import java.util.List; - -import java.util.Map; import org.apache.avro.specific.SpecificRecordBase; - import org.apache.commons.lang3.ClassUtils; -import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.Public; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.CrossFunction; import org.apache.flink.api.common.functions.FlatJoinFunction; @@ -66,19 +49,34 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple0; import org.apache.flink.api.java.typeutils.TypeExtractionUtils.LambdaExecutable; -import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.checkAndExtractLambda; -import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.getAllDeclaredMethods; -import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.isClassType; -import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.sameTypeVars; -import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.typeToClass; -import org.apache.flink.types.Either; import org.apache.flink.types.Value; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.GenericArrayType; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.lang.reflect.TypeVariable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.checkAndExtractLambda; +import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.getAllDeclaredMethods; +import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.isClassType; +import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.sameTypeVars; +import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.typeToClass; /** * A utility for reflection analysis on classes, to determine the return type of implementations of transformation @@ -690,38 +688,6 @@ public class TypeExtractor { return new TupleTypeInfo(typeToClass(t), subTypesInfo); } - // check if type is a subclass of Either - else if (isClassType(t) && Either.class.isAssignableFrom(typeToClass(t))) { - Type curT = t; - - // go up the hierarchy until we reach Either (with or without generics) - // collect the types while moving up for a later top-down - while (!(isClassType(curT) && typeToClass(curT).equals(Either.class))) { - typeHierarchy.add(curT); - curT = typeToClass(curT).getGenericSuperclass(); - } - - // check if Either has generics - if (curT instanceof Class<?>) { - throw new InvalidTypesException("Either needs to be parameterized by using generics."); - } - - typeHierarchy.add(curT); - - // create the type information for the subtypes - final TypeInformation<?>[] subTypesInfo = createSubTypesInfo(t, (ParameterizedType) curT, typeHierarchy, in1Type, in2Type, false); - // type needs to be treated a pojo due to additional fields - if (subTypesInfo == null) { - if (t instanceof ParameterizedType) { - return (TypeInformation<OUT>) analyzePojo(typeToClass(t), new ArrayList<Type>(typeHierarchy), (ParameterizedType) t, in1Type, in2Type); - } - else { - return (TypeInformation<OUT>) analyzePojo(typeToClass(t), new ArrayList<Type>(typeHierarchy), null, in1Type, in2Type); - } - } - // return either info - return (TypeInformation<OUT>) new EitherTypeInfo(subTypesInfo[0], subTypesInfo[1]); - } // type depends on another type // e.g. class MyMapper<E> extends MapFunction<String, E> else if (t instanceof TypeVariable) { @@ -947,7 +913,7 @@ public class TypeExtractor { /** * Creates the TypeInformation for all elements of a type that expects a certain number of - * subtypes (e.g. TupleXX or Either). + * subtypes (e.g. TupleXX). * * @param originalType most concrete subclass * @param definingType type that defines the number of subtypes (e.g. Tuple2 -> 2 subtypes) @@ -1234,29 +1200,6 @@ public class TypeExtractor { validateInfo(new ArrayList<Type>(typeHierarchy), subTypes[i], tti.getTypeAt(i)); } } - // check for Either - else if (typeInfo instanceof EitherTypeInfo) { - // check if Either at all - if (!(isClassType(type) && Either.class.isAssignableFrom(typeToClass(type)))) { - throw new InvalidTypesException("Either type expected."); - } - - // go up the hierarchy until we reach Either (with or without generics) - while (!(isClassType(type) && typeToClass(type).equals(Either.class))) { - typeHierarchy.add(type); - type = typeToClass(type).getGenericSuperclass(); - } - - // check if Either has generics - if (type instanceof Class<?>) { - throw new InvalidTypesException("Parameterized Either type expected."); - } - - EitherTypeInfo<?, ?> eti = (EitherTypeInfo<?, ?>) typeInfo; - Type[] subTypes = ((ParameterizedType) type).getActualTypeArguments(); - validateInfo(new ArrayList<Type>(typeHierarchy), subTypes[0], eti.getLeftType()); - validateInfo(new ArrayList<Type>(typeHierarchy), subTypes[1], eti.getRightType()); - } // check for primitive array else if (typeInfo instanceof PrimitiveArrayTypeInfo) { Type component; @@ -1675,11 +1618,6 @@ public class TypeExtractor { throw new InvalidTypesException("Type information extraction for tuples (except Tuple0) cannot be done based on the class."); } - // check for subclasses of Either - if (Either.class.isAssignableFrom(clazz)) { - throw new InvalidTypesException("Type information extraction for Either cannot be done based on the class."); - } - // check for Enums if(Enum.class.isAssignableFrom(clazz)) { return new EnumTypeInfo(clazz); @@ -1956,18 +1894,6 @@ public class TypeExtractor { } return new TupleTypeInfo(value.getClass(), infos); } - // we can not extract the types from an Either object since it only contains type information - // of one type, but from Either classes - else if (value instanceof Either) { - try { - return (TypeInformation<X>) privateCreateTypeInfo(value.getClass()); - } - catch (InvalidTypesException e) { - throw new InvalidTypesException("Automatic type extraction is not possible on an Either type " - + "as it does not contain information about both possible types. " - + "Please specify the types directly."); - } - } else { return privateGetForClass((Class<X>) value.getClass(), new ArrayList<Type>()); } http://git-wip-us.apache.org/repos/asf/flink/blob/d4d7cc32/flink-core/src/main/java/org/apache/flink/types/Either.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/Either.java b/flink-core/src/main/java/org/apache/flink/types/Either.java index d61b228..a08e968 100644 --- a/flink-core/src/main/java/org/apache/flink/types/Either.java +++ b/flink-core/src/main/java/org/apache/flink/types/Either.java @@ -19,6 +19,8 @@ package org.apache.flink.types; import org.apache.flink.annotation.Public; +import org.apache.flink.api.common.typeinfo.TypeInfo; +import org.apache.flink.api.java.typeutils.EitherTypeInfoFactory; /** * This type represents a value of one two possible types, Left or Right (a @@ -30,6 +32,7 @@ import org.apache.flink.annotation.Public; * the type of Right */ @Public +@TypeInfo(EitherTypeInfoFactory.class) public abstract class Either<L, R> { /**
