[FLINK-3602] Fix TypeExtractor and add support for recursive types This closes #1787
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7785288a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7785288a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7785288a Branch: refs/heads/master Commit: 7785288af9597995c7d95017d135ef7824493aae Parents: 77c867a Author: twalthr <[email protected]> Authored: Mon Mar 14 13:36:51 2016 +0100 Committer: Fabian Hueske <[email protected]> Committed: Tue Mar 22 13:53:58 2016 +0100 ---------------------------------------------------------------------- .../io/AvroInputFormatTypeExtractionTest.java | 2 ++ .../flink/api/java/typeutils/AvroTypeInfo.java | 6 +++-- .../flink/api/java/typeutils/TypeExtractor.java | 24 +++++++++++--------- 3 files changed, 19 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7785288a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java index 23fbab3..e245026 100644 --- a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java +++ b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java @@ -58,6 +58,8 @@ public class AvroInputFormatTypeExtractionTest { public String theString; + public MyAvroType recursive; + private double aDouble; public double getaDouble() { http://git-wip-us.apache.org/repos/asf/flink/blob/7785288a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java index 0132eff..1356e53 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java @@ -50,13 +50,15 @@ public class AvroTypeInfo<T extends SpecificRecordBase> extends PojoTypeInfo<T> private static <T extends SpecificRecordBase> List<PojoField> generateFieldsFromAvroSchema(Class<T> typeClass) { PojoTypeExtractor pte = new PojoTypeExtractor(); - TypeInformation ti = pte.analyzePojo(typeClass, new ArrayList<Type>(), null, null, null); + ArrayList<Type> typeHierarchy = new ArrayList<>(); + typeHierarchy.add(typeClass); + TypeInformation ti = pte.analyzePojo(typeClass, typeHierarchy, null, null, null); if(!(ti instanceof PojoTypeInfo)) { throw new IllegalStateException("Expecting type to be a PojoTypeInfo"); } PojoTypeInfo pti = (PojoTypeInfo) ti; - List<PojoField> newFields = new ArrayList<PojoField>(pti.getTotalFields()); + List<PojoField> newFields = new ArrayList<>(pti.getTotalFields()); for(int i = 0; i < pti.getArity(); i++) { PojoField f = pti.getPojoFieldAt(i); http://git-wip-us.apache.org/repos/asf/flink/blob/7785288a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index dd4b132..fdebffd 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -1322,11 +1322,22 @@ public class TypeExtractor { private <OUT,IN1,IN2> TypeInformation<OUT> privateGetForClass(Class<OUT> clazz, ArrayList<Type> typeHierarchy, ParameterizedType parameterizedType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) { Preconditions.checkNotNull(clazz); - + + // Object is handled as generic type info if (clazz.equals(Object.class)) { + return new GenericTypeInfo<>(clazz); + } + + // Class is handled as generic type info + if (clazz.equals(Class.class)) { return new GenericTypeInfo<OUT>(clazz); } - + + // recursive types are handled as generic type info + if (countTypeInHierarchy(typeHierarchy, clazz) > 1) { + return new GenericTypeInfo<>(clazz); + } + // check for arrays if (clazz.isArray()) { @@ -1394,20 +1405,11 @@ public class TypeExtractor { return new AvroTypeInfo(clazz); } - if (countTypeInHierarchy(typeHierarchy, clazz) > 1) { - return new GenericTypeInfo<OUT>(clazz); - } - if (Modifier.isInterface(clazz.getModifiers())) { // Interface has no members and is therefore not handled as POJO return new GenericTypeInfo<OUT>(clazz); } - if (clazz.equals(Class.class)) { - // special case handling for Class, this should not be handled by the POJO logic - return new GenericTypeInfo<OUT>(clazz); - } - try { TypeInformation<OUT> pojoType = analyzePojo(clazz, new ArrayList<Type>(typeHierarchy), parameterizedType, in1Type, in2Type); if (pojoType != null) {
