This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a8847061c40bf8ca17e22e6e412a378f53b8b82d Author: Andrei Bulgakov <[email protected]> AuthorDate: Tue May 7 12:01:51 2019 +0300 [FLINK-12175] Change filling of typeHierarchy in analyzePojo, for correctly creating fields TypeInfo Co-authored-by: Dawid Wysakowicz <[email protected]> --- .../flink/api/java/typeutils/TypeExtractor.java | 37 ++++---- .../PojoParametrizedTypeExtractionTest.java | 100 +++++++++++++++++++++ .../flink/formats/avro/typeutils/AvroTypeInfo.java | 12 +-- 3 files changed, 123 insertions(+), 26 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index 318f10d..4108ee2 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -805,8 +805,9 @@ public class TypeExtractor { // go up the hierarchy until we reach immediate child of Tuple (with or without generics) // collect the types while moving up for a later top-down + List<Type> typeHierarchyForSubtypes = new ArrayList<>(typeHierarchy); while (!(isClassType(curT) && typeToClass(curT).getSuperclass().equals(Tuple.class))) { - typeHierarchy.add(curT); + typeHierarchyForSubtypes.add(curT); curT = typeToClass(curT).getGenericSuperclass(); } @@ -819,24 +820,19 @@ public class TypeExtractor { throw new InvalidTypesException("Tuple needs to be parameterized by using generics."); } - typeHierarchy.add(curT); + typeHierarchyForSubtypes.add(curT); // create the type information for the subtypes final TypeInformation<?>[] subTypesInfo = createSubTypesInfo( t, (ParameterizedType) curT, - typeHierarchy, + typeHierarchyForSubtypes, in1Type, in2Type, false); // type needs to be treated a pojo due to additional fields if (subTypesInfo == null) { - if (t instanceof ParameterizedType) { - return analyzePojo(typeToClass(t), new ArrayList<>(typeHierarchy), (ParameterizedType) t, in1Type, in2Type); - } - else { - return analyzePojo(typeToClass(t), new ArrayList<>(typeHierarchy), null, in1Type, in2Type); - } + return analyzePojo(t, new ArrayList<>(typeHierarchy), in1Type, in2Type); } // return tuple info return new TupleTypeInfo(typeToClass(t), subTypesInfo); @@ -1692,7 +1688,8 @@ public class TypeExtractor { } try { - TypeInformation<OUT> pojoType = analyzePojo(clazz, new ArrayList<>(typeHierarchy), parameterizedType, in1Type, in2Type); + Type t = parameterizedType != null ? parameterizedType : clazz; + TypeInformation<OUT> pojoType = analyzePojo(t, new ArrayList<>(typeHierarchy), in1Type, in2Type); if (pojoType != null) { return pojoType; } @@ -1772,9 +1769,13 @@ public class TypeExtractor { } @SuppressWarnings("unchecked") - protected <OUT, IN1, IN2> TypeInformation<OUT> analyzePojo(Class<OUT> clazz, List<Type> typeHierarchy, - ParameterizedType parameterizedType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) { + protected <OUT, IN1, IN2> TypeInformation<OUT> analyzePojo( + Type type, + List<Type> typeHierarchy, + TypeInformation<IN1> in1Type, + TypeInformation<IN2> in2Type) { + Class<OUT> clazz = typeToClass(type); if (!Modifier.isPublic(clazz.getModifiers())) { LOG.info("Class " + clazz.getName() + " is not public so it cannot be used as a POJO type " + "and must be processed as GenericType. Please read the Flink documentation " + @@ -1782,14 +1783,8 @@ public class TypeExtractor { return new GenericTypeInfo<>(clazz); } - // add the hierarchy of the POJO itself if it is generic - if (parameterizedType != null) { - getTypeHierarchy(typeHierarchy, parameterizedType, Object.class); - } - // create a type hierarchy, if the incoming only contains the most bottom one or none. - else if (typeHierarchy.size() <= 1) { - getTypeHierarchy(typeHierarchy, clazz, Object.class); - } + // add the hierarchy of the POJO + getTypeHierarchy(typeHierarchy, type, Object.class); List<Field> fields = getAllDeclaredFields(clazz, false); if (fields.size() == 0) { @@ -1950,7 +1945,7 @@ public class TypeExtractor { int numFields = t.getArity(); if(numFields != countFieldsInClass(value.getClass())) { // not a tuple since it has more fields. - return analyzePojo((Class<X>) value.getClass(), new ArrayList<Type>(), null, null, null); // we immediately call analyze Pojo here, because + return analyzePojo(value.getClass(), new ArrayList<>(), null, null); // we immediately call analyze Pojo here, because // there is currently no other type that can handle such a class. } diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoParametrizedTypeExtractionTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoParametrizedTypeExtractionTest.java new file mode 100644 index 0000000..bed0375 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoParametrizedTypeExtractionTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; + +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; + +/** + * Tests concerning type extraction of Parametrized Pojo and its superclasses. + */ +public class PojoParametrizedTypeExtractionTest { + @Test + public void testDirectlyCreateTypeInfo() { + final TypeInformation<ParameterizedParentImpl> directTypeInfo = + TypeExtractor.createTypeInfo(ParameterizedParentImpl.class); + + assertThat(directTypeInfo, equalTo(getParameterizedParentTypeInformation())); + } + + @Test + public void testMapReturnTypeInfo(){ + TypeInformation<ParameterizedParentImpl> expectedTypeInfo = getParameterizedParentTypeInformation(); + + TypeInformation<ParameterizedParentImpl> mapReturnTypeInfo = TypeExtractor + .getMapReturnTypes(new ConcreteMapFunction(), Types.INT); + + assertThat(mapReturnTypeInfo, equalTo(expectedTypeInfo)); + } + + private TypeInformation<ParameterizedParentImpl> getParameterizedParentTypeInformation() { + Map<String, TypeInformation<?>> nestedFields = new HashMap<>(); + nestedFields.put("digits", Types.INT); + nestedFields.put("letters", Types.STRING); + + Map<String, TypeInformation<?>> fields = new HashMap<>(); + fields.put("precise", Types.DOUBLE); + fields.put("pojoField", Types.POJO(Pojo.class, nestedFields)); + + return Types.POJO( + ParameterizedParentImpl.class, + fields + ); + } + + /** + * Representation of Pojo class with 2 fields. + */ + public static class Pojo { + public int digits; + public String letters; + } + + /** + * Representation of class which is parametrized by some pojo. + */ + public static class ParameterizedParent<T> { + public T pojoField; + } + + /** + * Implementation of ParametrizedParent parametrized by Pojo. + */ + public static class ParameterizedParentImpl extends ParameterizedParent<Pojo> { + public double precise; + } + /** + * Representation of map function for type extraction. + */ + public static class ConcreteMapFunction implements MapFunction<Integer, ParameterizedParentImpl> { + @Override + public ParameterizedParentImpl map(Integer value) throws Exception { + return null; + } + } +} diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java index 0fcee4f..9f4b469 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java @@ -29,7 +29,6 @@ import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.avro.specific.SpecificRecordBase; -import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; import java.util.ArrayList; import java.util.List; @@ -66,7 +65,7 @@ public class AvroTypeInfo<T extends SpecificRecordBase> extends PojoTypeInfo<T> PojoTypeExtractor pte = new PojoTypeExtractor(); List<Type> typeHierarchy = new ArrayList<>(); typeHierarchy.add(typeClass); - TypeInformation<T> ti = pte.analyzePojo(typeClass, typeHierarchy, null, null, null); + TypeInformation<T> ti = pte.analyzePojo(typeClass, typeHierarchy, null, null); if (!(ti instanceof PojoTypeInfo)) { throw new IllegalStateException("Expecting type to be a PojoTypeInfo"); @@ -96,9 +95,12 @@ public class AvroTypeInfo<T extends SpecificRecordBase> extends PojoTypeInfo<T> } @Override - public <OUT, IN1, IN2> TypeInformation<OUT> analyzePojo(Class<OUT> clazz, List<Type> typeHierarchy, - ParameterizedType parameterizedType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) { - return super.analyzePojo(clazz, typeHierarchy, parameterizedType, in1Type, in2Type); + public <OUT, IN1, IN2> TypeInformation<OUT> analyzePojo( + Type type, + List<Type> typeHierarchy, + TypeInformation<IN1> in1Type, + TypeInformation<IN2> in2Type) { + return super.analyzePojo(type, typeHierarchy, in1Type, in2Type); } } }
