Repository: flink
Updated Branches:
  refs/heads/master f11447e58 -> d4d7cc326


[FLINK-4673] [core] TypeInfoFactory for Either type

Removes from TypeExtractor the explicit parsing for Either and adds an
EitherTypeInfoFactory.

This closes #2545.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d4d7cc32
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d4d7cc32
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d4d7cc32

Branch: refs/heads/master
Commit: d4d7cc32667016d66c65a7d64601cabd101a0c4d
Parents: f11447e
Author: Greg Hogan <[email protected]>
Authored: Fri Sep 23 14:59:36 2016 -0400
Committer: twalthr <[email protected]>
Committed: Tue Jan 10 14:52:38 2017 +0100

----------------------------------------------------------------------
 .../api/java/typeutils/EitherTypeInfo.java      |  20 +++-
 .../java/typeutils/EitherTypeInfoFactory.java   |  48 ++++++++
 .../flink/api/java/typeutils/TypeExtractor.java | 120 ++++---------------
 .../java/org/apache/flink/types/Either.java     |   3 +
 4 files changed, 91 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d4d7cc32/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
index 058de12..d4d1aa9 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
@@ -18,14 +18,19 @@
 
 package org.apache.flink.api.java.typeutils;
 
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.runtime.EitherSerializer;
 import org.apache.flink.types.Either;
 
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A {@link TypeInformation} for the {@link Either} type of the Java API.
  *
@@ -43,8 +48,8 @@ public class EitherTypeInfo<L, R> extends 
TypeInformation<Either<L, R>> {
 
        @PublicEvolving
        public EitherTypeInfo(TypeInformation<L> leftType, TypeInformation<R> 
rightType) {
-               this.leftType = leftType;
-               this.rightType = rightType;
+               this.leftType = checkNotNull(leftType);
+               this.rightType = checkNotNull(rightType);
        }
 
        @Override
@@ -80,6 +85,15 @@ public class EitherTypeInfo<L, R> extends 
TypeInformation<Either<L, R>> {
 
        @Override
        @PublicEvolving
+       public Map<String, TypeInformation<?>> getGenericParameters() {
+               Map<String, TypeInformation<?>> m = new HashMap<>();
+               m.put("L", this.leftType);
+               m.put("R", this.rightType);
+               return m;
+       }
+
+       @Override
+       @PublicEvolving
        public boolean isKeyType() {
                return false;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/d4d7cc32/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfoFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfoFactory.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfoFactory.java
new file mode 100644
index 0000000..be881a7
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfoFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.types.Either;
+
+import java.lang.reflect.Type;
+import java.util.Map;
+
+public class EitherTypeInfoFactory<L, R> extends TypeInfoFactory<Either<L, R>> 
{
+
+       @Override
+       public TypeInformation<Either<L, R>> createTypeInfo(Type t, Map<String, 
TypeInformation<?>> genericParameters) {
+               TypeInformation<?> leftType = genericParameters.get("L");
+               TypeInformation<?> rightType = genericParameters.get("R");
+
+               if (leftType == null) {
+                       throw new InvalidTypesException("Type extraction is not 
possible on Either" +
+                               " type as it does not contain information about 
the 'left' type.");
+               }
+
+               if (rightType == null) {
+                       throw new InvalidTypesException("Type extraction is not 
possible on Either" +
+                               " type as it does not contain information about 
the 'right' type.");
+               }
+
+               return new EitherTypeInfo(leftType, rightType);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d4d7cc32/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 08f8c53..df4a0e0 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -18,28 +18,11 @@
 
 package org.apache.flink.api.java.typeutils;
 
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Field;
-import java.lang.reflect.GenericArrayType;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-import java.lang.reflect.TypeVariable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.HashMap;
-import java.util.List;
-
-import java.util.Map;
 import org.apache.avro.specific.SpecificRecordBase;
-
 import org.apache.commons.lang3.ClassUtils;
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.CrossFunction;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
@@ -66,19 +49,34 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple0;
 import 
org.apache.flink.api.java.typeutils.TypeExtractionUtils.LambdaExecutable;
-import static 
org.apache.flink.api.java.typeutils.TypeExtractionUtils.checkAndExtractLambda;
-import static 
org.apache.flink.api.java.typeutils.TypeExtractionUtils.getAllDeclaredMethods;
-import static 
org.apache.flink.api.java.typeutils.TypeExtractionUtils.isClassType;
-import static 
org.apache.flink.api.java.typeutils.TypeExtractionUtils.sameTypeVars;
-import static 
org.apache.flink.api.java.typeutils.TypeExtractionUtils.typeToClass;
-import org.apache.flink.types.Either;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.GenericArrayType;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.lang.reflect.TypeVariable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static 
org.apache.flink.api.java.typeutils.TypeExtractionUtils.checkAndExtractLambda;
+import static 
org.apache.flink.api.java.typeutils.TypeExtractionUtils.getAllDeclaredMethods;
+import static 
org.apache.flink.api.java.typeutils.TypeExtractionUtils.isClassType;
+import static 
org.apache.flink.api.java.typeutils.TypeExtractionUtils.sameTypeVars;
+import static 
org.apache.flink.api.java.typeutils.TypeExtractionUtils.typeToClass;
 
 /**
  * A utility for reflection analysis on classes, to determine the return type 
of implementations of transformation
@@ -690,38 +688,6 @@ public class TypeExtractor {
                        return new TupleTypeInfo(typeToClass(t), subTypesInfo);
                        
                }
-               // check if type is a subclass of Either
-               else if (isClassType(t) && 
Either.class.isAssignableFrom(typeToClass(t))) {
-                       Type curT = t;
-
-                       // go up the hierarchy until we reach Either (with or 
without generics)
-                       // collect the types while moving up for a later 
top-down
-                       while (!(isClassType(curT) && 
typeToClass(curT).equals(Either.class))) {
-                               typeHierarchy.add(curT);
-                               curT = typeToClass(curT).getGenericSuperclass();
-                       }
-
-                       // check if Either has generics
-                       if (curT instanceof Class<?>) {
-                               throw new InvalidTypesException("Either needs 
to be parameterized by using generics.");
-                       }
-
-                       typeHierarchy.add(curT);
-
-                       // create the type information for the subtypes
-                       final TypeInformation<?>[] subTypesInfo = 
createSubTypesInfo(t, (ParameterizedType) curT, typeHierarchy, in1Type, 
in2Type, false);
-                       // type needs to be treated a pojo due to additional 
fields
-                       if (subTypesInfo == null) {
-                               if (t instanceof ParameterizedType) {
-                                       return (TypeInformation<OUT>) 
analyzePojo(typeToClass(t), new ArrayList<Type>(typeHierarchy), 
(ParameterizedType) t, in1Type, in2Type);
-                               }
-                               else {
-                                       return (TypeInformation<OUT>) 
analyzePojo(typeToClass(t), new ArrayList<Type>(typeHierarchy), null, in1Type, 
in2Type);
-                               }
-                       }
-                       // return either info
-                       return (TypeInformation<OUT>) new 
EitherTypeInfo(subTypesInfo[0], subTypesInfo[1]);
-               }
                // type depends on another type
                // e.g. class MyMapper<E> extends MapFunction<String, E>
                else if (t instanceof TypeVariable) {
@@ -947,7 +913,7 @@ public class TypeExtractor {
 
        /**
         * Creates the TypeInformation for all elements of a type that expects 
a certain number of
-        * subtypes (e.g. TupleXX or Either).
+        * subtypes (e.g. TupleXX).
         *
         * @param originalType most concrete subclass
         * @param definingType type that defines the number of subtypes (e.g. 
Tuple2 -> 2 subtypes)
@@ -1234,29 +1200,6 @@ public class TypeExtractor {
                                        validateInfo(new 
ArrayList<Type>(typeHierarchy), subTypes[i], tti.getTypeAt(i));
                                }
                        }
-                       // check for Either
-                       else if (typeInfo instanceof EitherTypeInfo) {
-                               // check if Either at all
-                               if (!(isClassType(type) && 
Either.class.isAssignableFrom(typeToClass(type)))) {
-                                       throw new InvalidTypesException("Either 
type expected.");
-                               }
-
-                               // go up the hierarchy until we reach Either 
(with or without generics)
-                               while (!(isClassType(type) && 
typeToClass(type).equals(Either.class))) {
-                                       typeHierarchy.add(type);
-                                       type = 
typeToClass(type).getGenericSuperclass();
-                               }
-
-                               // check if Either has generics
-                               if (type instanceof Class<?>) {
-                                       throw new 
InvalidTypesException("Parameterized Either type expected.");
-                               }
-
-                               EitherTypeInfo<?, ?> eti = (EitherTypeInfo<?, 
?>) typeInfo;
-                               Type[] subTypes = ((ParameterizedType) 
type).getActualTypeArguments();
-                               validateInfo(new 
ArrayList<Type>(typeHierarchy), subTypes[0], eti.getLeftType());
-                               validateInfo(new 
ArrayList<Type>(typeHierarchy), subTypes[1], eti.getRightType());
-                       }
                        // check for primitive array
                        else if (typeInfo instanceof PrimitiveArrayTypeInfo) {
                                Type component;
@@ -1675,11 +1618,6 @@ public class TypeExtractor {
                        throw new InvalidTypesException("Type information 
extraction for tuples (except Tuple0) cannot be done based on the class.");
                }
 
-               // check for subclasses of Either
-               if (Either.class.isAssignableFrom(clazz)) {
-                       throw new InvalidTypesException("Type information 
extraction for Either cannot be done based on the class.");
-               }
-
                // check for Enums
                if(Enum.class.isAssignableFrom(clazz)) {
                        return new EnumTypeInfo(clazz);
@@ -1956,18 +1894,6 @@ public class TypeExtractor {
                        }
                        return new TupleTypeInfo(value.getClass(), infos);
                }
-               // we can not extract the types from an Either object since it 
only contains type information
-               // of one type, but from Either classes
-               else if (value instanceof Either) {
-                       try {
-                               return (TypeInformation<X>) 
privateCreateTypeInfo(value.getClass());
-                       }
-                       catch (InvalidTypesException e) {
-                               throw new InvalidTypesException("Automatic type 
extraction is not possible on an Either type "
-                                               + "as it does not contain 
information about both possible types. "
-                                               + "Please specify the types 
directly.");
-                       }
-               }
                else {
                        return privateGetForClass((Class<X>) value.getClass(), 
new ArrayList<Type>());
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/d4d7cc32/flink-core/src/main/java/org/apache/flink/types/Either.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/Either.java 
b/flink-core/src/main/java/org/apache/flink/types/Either.java
index d61b228..a08e968 100644
--- a/flink-core/src/main/java/org/apache/flink/types/Either.java
+++ b/flink-core/src/main/java/org/apache/flink/types/Either.java
@@ -19,6 +19,8 @@
 package org.apache.flink.types;
 
 import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.typeinfo.TypeInfo;
+import org.apache.flink.api.java.typeutils.EitherTypeInfoFactory;
 
 /**
  * This type represents a value of one two possible types, Left or Right (a
@@ -30,6 +32,7 @@ import org.apache.flink.annotation.Public;
  *            the type of Right
  */
 @Public
+@TypeInfo(EitherTypeInfoFactory.class)
 public abstract class Either<L, R> {
 
        /**

Reply via email to