[FLINK-3602] Fix TypeExtractor and add support for recursive types

This closes #1787


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7785288a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7785288a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7785288a

Branch: refs/heads/master
Commit: 7785288af9597995c7d95017d135ef7824493aae
Parents: 77c867a
Author: twalthr <[email protected]>
Authored: Mon Mar 14 13:36:51 2016 +0100
Committer: Fabian Hueske <[email protected]>
Committed: Tue Mar 22 13:53:58 2016 +0100

----------------------------------------------------------------------
 .../io/AvroInputFormatTypeExtractionTest.java   |  2 ++
 .../flink/api/java/typeutils/AvroTypeInfo.java  |  6 +++--
 .../flink/api/java/typeutils/TypeExtractor.java | 24 +++++++++++---------
 3 files changed, 19 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7785288a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
 
b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
index 23fbab3..e245026 100644
--- 
a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
+++ 
b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
@@ -58,6 +58,8 @@ public class AvroInputFormatTypeExtractionTest {
 
                public String theString;
 
+               public MyAvroType recursive;
+
                private double aDouble;
 
                public double getaDouble() {

http://git-wip-us.apache.org/repos/asf/flink/blob/7785288a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
index 0132eff..1356e53 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
@@ -50,13 +50,15 @@ public class AvroTypeInfo<T extends SpecificRecordBase> 
extends PojoTypeInfo<T>
 
        private static <T extends SpecificRecordBase> List<PojoField> 
generateFieldsFromAvroSchema(Class<T> typeClass) {
                PojoTypeExtractor pte = new PojoTypeExtractor();
-               TypeInformation ti = pte.analyzePojo(typeClass, new 
ArrayList<Type>(), null, null, null);
+               ArrayList<Type> typeHierarchy = new ArrayList<>();
+               typeHierarchy.add(typeClass);
+               TypeInformation ti = pte.analyzePojo(typeClass, typeHierarchy, 
null, null, null);
 
                if(!(ti instanceof PojoTypeInfo)) {
                        throw new IllegalStateException("Expecting type to be a 
PojoTypeInfo");
                }
                PojoTypeInfo pti =  (PojoTypeInfo) ti;
-               List<PojoField> newFields = new 
ArrayList<PojoField>(pti.getTotalFields());
+               List<PojoField> newFields = new 
ArrayList<>(pti.getTotalFields());
 
                for(int i = 0; i < pti.getArity(); i++) {
                        PojoField f = pti.getPojoFieldAt(i);

http://git-wip-us.apache.org/repos/asf/flink/blob/7785288a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index dd4b132..fdebffd 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -1322,11 +1322,22 @@ public class TypeExtractor {
        private <OUT,IN1,IN2> TypeInformation<OUT> 
privateGetForClass(Class<OUT> clazz, ArrayList<Type> typeHierarchy,
                        ParameterizedType parameterizedType, 
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
                Preconditions.checkNotNull(clazz);
-               
+
+               // Object is handled as generic type info
                if (clazz.equals(Object.class)) {
+                       return new GenericTypeInfo<>(clazz);
+               }
+
+               // Class is handled as generic type info
+               if (clazz.equals(Class.class)) {
                        return new GenericTypeInfo<OUT>(clazz);
                }
-               
+
+               // recursive types are handled as generic type info
+               if (countTypeInHierarchy(typeHierarchy, clazz) > 1) {
+                       return new GenericTypeInfo<>(clazz);
+               }
+
                // check for arrays
                if (clazz.isArray()) {
 
@@ -1394,20 +1405,11 @@ public class TypeExtractor {
                        return new AvroTypeInfo(clazz);
                }
 
-               if (countTypeInHierarchy(typeHierarchy, clazz) > 1) {
-                       return new GenericTypeInfo<OUT>(clazz);
-               }
-
                if (Modifier.isInterface(clazz.getModifiers())) {
                        // Interface has no members and is therefore not 
handled as POJO
                        return new GenericTypeInfo<OUT>(clazz);
                }
 
-               if (clazz.equals(Class.class)) {
-                       // special case handling for Class, this should not be 
handled by the POJO logic
-                       return new GenericTypeInfo<OUT>(clazz);
-               }
-
                try {
                        TypeInformation<OUT> pojoType = analyzePojo(clazz, new 
ArrayList<Type>(typeHierarchy), parameterizedType, in1Type, in2Type);
                        if (pojoType != null) {

Reply via email to