This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a8847061c40bf8ca17e22e6e412a378f53b8b82d
Author: Andrei Bulgakov <[email protected]>
AuthorDate: Tue May 7 12:01:51 2019 +0300

    [FLINK-12175] Change filling of typeHierarchy in analyzePojo, for correctly 
creating fields TypeInfo
    
    Co-authored-by: Dawid Wysakowicz <[email protected]>
---
 .../flink/api/java/typeutils/TypeExtractor.java    |  37 ++++----
 .../PojoParametrizedTypeExtractionTest.java        | 100 +++++++++++++++++++++
 .../flink/formats/avro/typeutils/AvroTypeInfo.java |  12 +--
 3 files changed, 123 insertions(+), 26 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 318f10d..4108ee2 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -805,8 +805,9 @@ public class TypeExtractor {
 
                        // go up the hierarchy until we reach immediate child 
of Tuple (with or without generics)
                        // collect the types while moving up for a later 
top-down
+                       List<Type> typeHierarchyForSubtypes = new 
ArrayList<>(typeHierarchy);
                        while (!(isClassType(curT) && 
typeToClass(curT).getSuperclass().equals(Tuple.class))) {
-                               typeHierarchy.add(curT);
+                               typeHierarchyForSubtypes.add(curT);
                                curT = typeToClass(curT).getGenericSuperclass();
                        }
 
@@ -819,24 +820,19 @@ public class TypeExtractor {
                                throw new InvalidTypesException("Tuple needs to 
be parameterized by using generics.");
                        }
 
-                       typeHierarchy.add(curT);
+                       typeHierarchyForSubtypes.add(curT);
 
                        // create the type information for the subtypes
                        final TypeInformation<?>[] subTypesInfo = 
createSubTypesInfo(
                                t,
                                (ParameterizedType) curT,
-                               typeHierarchy,
+                               typeHierarchyForSubtypes,
                                in1Type,
                                in2Type,
                                false);
                        // type needs to be treated a pojo due to additional 
fields
                        if (subTypesInfo == null) {
-                               if (t instanceof ParameterizedType) {
-                                       return analyzePojo(typeToClass(t), new 
ArrayList<>(typeHierarchy), (ParameterizedType) t, in1Type, in2Type);
-                               }
-                               else {
-                                       return analyzePojo(typeToClass(t), new 
ArrayList<>(typeHierarchy), null, in1Type, in2Type);
-                               }
+                               return analyzePojo(t, new 
ArrayList<>(typeHierarchy), in1Type, in2Type);
                        }
                        // return tuple info
                        return new TupleTypeInfo(typeToClass(t), subTypesInfo);
@@ -1692,7 +1688,8 @@ public class TypeExtractor {
                }
 
                try {
-                       TypeInformation<OUT> pojoType = analyzePojo(clazz, new 
ArrayList<>(typeHierarchy), parameterizedType, in1Type, in2Type);
+                       Type t = parameterizedType != null ? parameterizedType 
: clazz;
+                       TypeInformation<OUT> pojoType = analyzePojo(t, new 
ArrayList<>(typeHierarchy), in1Type, in2Type);
                        if (pojoType != null) {
                                return pojoType;
                        }
@@ -1772,9 +1769,13 @@ public class TypeExtractor {
        }
 
        @SuppressWarnings("unchecked")
-       protected <OUT, IN1, IN2> TypeInformation<OUT> analyzePojo(Class<OUT> 
clazz, List<Type> typeHierarchy,
-                       ParameterizedType parameterizedType, 
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
+       protected <OUT, IN1, IN2> TypeInformation<OUT> analyzePojo(
+                       Type type,
+                       List<Type> typeHierarchy,
+                       TypeInformation<IN1> in1Type,
+                       TypeInformation<IN2> in2Type) {
 
+               Class<OUT> clazz = typeToClass(type);
                if (!Modifier.isPublic(clazz.getModifiers())) {
                        LOG.info("Class " + clazz.getName() + " is not public 
so it cannot be used as a POJO type " +
                                "and must be processed as GenericType. Please 
read the Flink documentation " +
@@ -1782,14 +1783,8 @@ public class TypeExtractor {
                        return new GenericTypeInfo<>(clazz);
                }
 
-               // add the hierarchy of the POJO itself if it is generic
-               if (parameterizedType != null) {
-                       getTypeHierarchy(typeHierarchy, parameterizedType, 
Object.class);
-               }
-               // create a type hierarchy, if the incoming only contains the 
most bottom one or none.
-               else if (typeHierarchy.size() <= 1) {
-                       getTypeHierarchy(typeHierarchy, clazz, Object.class);
-               }
+               // add the hierarchy of the POJO
+               getTypeHierarchy(typeHierarchy, type, Object.class);
 
                List<Field> fields = getAllDeclaredFields(clazz, false);
                if (fields.size() == 0) {
@@ -1950,7 +1945,7 @@ public class TypeExtractor {
                        int numFields = t.getArity();
                        if(numFields != countFieldsInClass(value.getClass())) {
                                // not a tuple since it has more fields.
-                               return analyzePojo((Class<X>) value.getClass(), 
new ArrayList<Type>(), null, null, null); // we immediately call analyze Pojo 
here, because
+                               return analyzePojo(value.getClass(), new 
ArrayList<>(), null, null); // we immediately call analyze Pojo here, because
                                // there is currently no other type that can 
handle such a class.
                        }
 
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoParametrizedTypeExtractionTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoParametrizedTypeExtractionTest.java
new file mode 100644
index 0000000..bed0375
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoParametrizedTypeExtractionTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ *  Tests concerning type extraction of Parametrized Pojo and its superclasses.
+ */
+public class PojoParametrizedTypeExtractionTest {
+       @Test
+       public void testDirectlyCreateTypeInfo() {
+               final TypeInformation<ParameterizedParentImpl> directTypeInfo =
+                       
TypeExtractor.createTypeInfo(ParameterizedParentImpl.class);
+
+               assertThat(directTypeInfo, 
equalTo(getParameterizedParentTypeInformation()));
+       }
+
+       @Test
+       public void testMapReturnTypeInfo(){
+               TypeInformation<ParameterizedParentImpl> expectedTypeInfo = 
getParameterizedParentTypeInformation();
+
+               TypeInformation<ParameterizedParentImpl> mapReturnTypeInfo = 
TypeExtractor
+                       .getMapReturnTypes(new ConcreteMapFunction(), 
Types.INT);
+
+               assertThat(mapReturnTypeInfo, equalTo(expectedTypeInfo));
+       }
+
+       private TypeInformation<ParameterizedParentImpl> 
getParameterizedParentTypeInformation() {
+               Map<String, TypeInformation<?>> nestedFields = new HashMap<>();
+               nestedFields.put("digits", Types.INT);
+               nestedFields.put("letters", Types.STRING);
+
+               Map<String, TypeInformation<?>> fields = new HashMap<>();
+               fields.put("precise", Types.DOUBLE);
+               fields.put("pojoField", Types.POJO(Pojo.class, nestedFields));
+
+               return Types.POJO(
+                       ParameterizedParentImpl.class,
+                       fields
+               );
+       }
+
+       /**
+        * Representation of Pojo class with 2 fields.
+        */
+       public static class Pojo {
+               public int digits;
+               public String letters;
+       }
+
+       /**
+        * Representation of class which is parametrized by some pojo.
+        */
+       public static class ParameterizedParent<T> {
+               public T pojoField;
+       }
+
+       /**
+        * Implementation of ParametrizedParent parametrized by Pojo.
+        */
+       public static class ParameterizedParentImpl extends 
ParameterizedParent<Pojo> {
+               public double precise;
+       }
+       /**
+        * Representation of map function for type extraction.
+        */
+       public static class ConcreteMapFunction implements MapFunction<Integer, 
ParameterizedParentImpl> {
+               @Override
+               public ParameterizedParentImpl map(Integer value) throws 
Exception {
+                       return null;
+               }
+       }
+}
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
index 0fcee4f..9f4b469 100644
--- 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
@@ -29,7 +29,6 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 
 import org.apache.avro.specific.SpecificRecordBase;
 
-import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
 import java.util.ArrayList;
 import java.util.List;
@@ -66,7 +65,7 @@ public class AvroTypeInfo<T extends SpecificRecordBase> 
extends PojoTypeInfo<T>
                        PojoTypeExtractor pte = new PojoTypeExtractor();
                        List<Type> typeHierarchy = new ArrayList<>();
                        typeHierarchy.add(typeClass);
-                       TypeInformation<T> ti = pte.analyzePojo(typeClass, 
typeHierarchy, null, null, null);
+                       TypeInformation<T> ti = pte.analyzePojo(typeClass, 
typeHierarchy, null, null);
 
                        if (!(ti instanceof PojoTypeInfo)) {
                                throw new IllegalStateException("Expecting type 
to be a PojoTypeInfo");
@@ -96,9 +95,12 @@ public class AvroTypeInfo<T extends SpecificRecordBase> 
extends PojoTypeInfo<T>
                }
 
                @Override
-               public <OUT, IN1, IN2> TypeInformation<OUT> 
analyzePojo(Class<OUT> clazz, List<Type> typeHierarchy,
-                               ParameterizedType parameterizedType, 
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
-                       return super.analyzePojo(clazz, typeHierarchy, 
parameterizedType, in1Type, in2Type);
+               public <OUT, IN1, IN2> TypeInformation<OUT> analyzePojo(
+                               Type type,
+                               List<Type> typeHierarchy,
+                               TypeInformation<IN1> in1Type,
+                               TypeInformation<IN2> in2Type) {
+                       return super.analyzePojo(type, typeHierarchy, in1Type, 
in2Type);
                }
        }
 }

Reply via email to