This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d326b05  [FLINK-12141] [API/Type Serialization System] Allow @TypeInfo 
annotation on POJO field declarations (#8344)
d326b05 is described below

commit d326b0574a373bd5eef63a44261f8762709265f8
Author: Leeviiii <[email protected]>
AuthorDate: Tue Aug 17 02:44:46 2021 +0800

    [FLINK-12141] [API/Type Serialization System] Allow @TypeInfo annotation on 
POJO field declarations (#8344)
    
    - build on existing code, extend checks, tests, and docs
    
    Co-authored-by: yangfei5 <[email protected]>
    Co-authored-by: Nico Kruber <[email protected]>
---
 .../serialization/types_serialization.md           | 15 ++++-
 .../apache/flink/api/common/typeinfo/TypeInfo.java |  2 +-
 .../flink/api/java/typeutils/TypeExtractor.java    | 53 ++++++++++++++--
 .../api/java/typeutils/TypeInfoFactoryTest.java    | 71 ++++++++++++++++++++++
 4 files changed, 133 insertions(+), 8 deletions(-)

diff --git 
a/docs/content/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md
 
b/docs/content/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md
index e289df9..a490dc0 100644
--- 
a/docs/content/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md
+++ 
b/docs/content/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md
@@ -522,8 +522,8 @@ env.getConfig().disableGenericTypes();
 
 A type information factory allows for plugging-in user-defined type 
information into the Flink type system.
 You have to implement `org.apache.flink.api.common.typeinfo.TypeInfoFactory` 
to return your custom type information. 
-The factory is called during the type extraction phase if the corresponding 
type has been annotated 
-with the `@org.apache.flink.api.common.typeinfo.TypeInfo` annotation. 
+The factory is called during the type extraction phase if either the 
corresponding type or a POJO's field using
+this type has been annotated with the 
`@org.apache.flink.api.common.typeinfo.TypeInfo` annotation.
 
 Type information factories can be used in both the Java and Scala API.
 
@@ -553,6 +553,17 @@ public class MyTupleTypeInfoFactory extends 
TypeInfoFactory<MyTuple> {
 }
 ```
 
+Instead of annotating the type itself, which may not be possible for 
third-party code, you can also
+annotate the usage of this type inside a valid Flink POJO like this:
+```java
+public class MyPojo {
+  public int id;
+
+  @TypeInfo(MyTupleTypeInfoFactory.class)
+  public MyTuple<Integer, String> tuple;
+}
+```
+
 The method `createTypeInfo(Type, Map<String, TypeInformation<?>>)` creates 
type information for the type the factory is targeted for. 
 The parameters provide additional information about the type itself as well as 
the type's generic type parameters if available.
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInfo.java 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInfo.java
index 9aa01ac..84980be 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInfo.java
@@ -35,7 +35,7 @@ import java.lang.reflect.Type;
  * has highest precedence (see {@link TypeExtractor#registerFactory(Type, 
Class)}).
  */
 @Documented
-@Target(ElementType.TYPE)
+@Target({ElementType.TYPE, ElementType.FIELD})
 @Retention(RetentionPolicy.RUNTIME)
 @Public
 public @interface TypeInfo {
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 9cef04c..c6fb875 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -1308,6 +1308,19 @@ public class TypeExtractor {
         }
         final Type factoryDefiningType = 
factoryHierarchy.get(factoryHierarchy.size() - 1);
 
+        return createTypeInfoFromFactory(
+                t, in1Type, in2Type, factoryHierarchy, factory, 
factoryDefiningType);
+    }
+
+    /** Creates type information using a given factory. */
+    @SuppressWarnings("unchecked")
+    private <IN1, IN2, OUT> TypeInformation<OUT> createTypeInfoFromFactory(
+            Type t,
+            TypeInformation<IN1> in1Type,
+            TypeInformation<IN2> in2Type,
+            List<Type> factoryHierarchy,
+            TypeInfoFactory<? super OUT> factory,
+            Type factoryDefiningType) {
         // infer possible type parameters from input
         final Map<String, TypeInformation<?>> genericParams;
         if (factoryDefiningType instanceof ParameterizedType) {
@@ -1689,6 +1702,23 @@ public class TypeExtractor {
         return (TypeInfoFactory<OUT>) 
InstantiationUtil.instantiate(factoryClass);
     }
 
+    /** Returns the type information factory for an annotated field. */
+    @Internal
+    @SuppressWarnings("unchecked")
+    public static <OUT> TypeInfoFactory<OUT> getTypeInfoFactory(Field field) {
+        if (!isClassType(field.getType()) || 
!field.isAnnotationPresent(TypeInfo.class)) {
+            return null;
+        }
+
+        Class<?> factoryClass = field.getAnnotation(TypeInfo.class).value();
+        // check for valid factory class
+        if (!TypeInfoFactory.class.isAssignableFrom(factoryClass)) {
+            throw new InvalidTypesException(
+                    "TypeInfo annotation does not specify a valid 
TypeInfoFactory.");
+        }
+        return (TypeInfoFactory<OUT>) 
InstantiationUtil.instantiate(factoryClass);
+    }
+
     /** @return number of items with equal type or same raw type */
     private static int countTypeInHierarchy(List<Type> typeHierarchy, Type 
type) {
         int count = 0;
@@ -2043,12 +2073,25 @@ public class TypeExtractor {
                 return null;
             }
             try {
+                final TypeInformation<?> typeInfo;
                 List<Type> fieldTypeHierarchy = new ArrayList<>(typeHierarchy);
-                fieldTypeHierarchy.add(fieldType);
-                TypeInformation<?> ti =
-                        createTypeInfoWithTypeHierarchy(
-                                fieldTypeHierarchy, fieldType, in1Type, 
in2Type);
-                pojoFields.add(new PojoField(field, ti));
+                TypeInfoFactory factory = getTypeInfoFactory(field);
+                if (factory != null) {
+                    typeInfo =
+                            createTypeInfoFromFactory(
+                                    fieldType,
+                                    in1Type,
+                                    in2Type,
+                                    fieldTypeHierarchy,
+                                    factory,
+                                    fieldType);
+                } else {
+                    fieldTypeHierarchy.add(fieldType);
+                    typeInfo =
+                            createTypeInfoWithTypeHierarchy(
+                                    fieldTypeHierarchy, fieldType, in1Type, 
in2Type);
+                }
+                pojoFields.add(new PojoField(field, typeInfo));
             } catch (InvalidTypesException e) {
                 Class<?> genericClass = Object.class;
                 if (isClassType(fieldType)) {
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoFactoryTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoFactoryTest.java
index d97a916..b23245e 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoFactoryTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoFactoryTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.java.typeutils;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -38,6 +39,7 @@ import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.BOOLEAN_TYPE_IN
 import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.DOUBLE_TYPE_INFO;
 import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.FLOAT_TYPE_INFO;
 import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO;
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
 import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -129,6 +131,40 @@ public class TypeInfoFactoryTest {
         assertEquals(BOOLEAN_TYPE_INFO, mtti.getField1());
     }
 
+    @Test
+    public void testWithFieldTypeInfoAnnotation() {
+        TypeInformation<WithFieldTypeInfoAnnotation<Double, String>> 
typeWithAnnotation =
+                TypeInformation.of(new 
TypeHint<WithFieldTypeInfoAnnotation<Double, String>>() {});
+        TypeInformation<WithoutFieldTypeInfoAnnotation<Double, String>> 
typeWithoutAnnotation =
+                TypeInformation.of(
+                        new TypeHint<WithoutFieldTypeInfoAnnotation<Double, 
String>>() {});
+
+        assertTrue(typeWithAnnotation instanceof PojoTypeInfo);
+        assertTrue(typeWithoutAnnotation instanceof PojoTypeInfo);
+        PojoTypeInfo<?> pojoTypeWithAnnotation = (PojoTypeInfo<?>) 
typeWithAnnotation;
+        PojoTypeInfo<?> pojoTypeWithoutAnnotation = (PojoTypeInfo<?>) 
typeWithoutAnnotation;
+
+        // field outerEither
+        assertTrue(pojoTypeWithAnnotation.getTypeAt(1) instanceof 
EitherTypeInfo);
+        assertTrue(pojoTypeWithoutAnnotation.getTypeAt(1) instanceof 
GenericTypeInfo);
+        // field id: type info from field annotation that overrides the class 
annotation:
+        assertEquals(LONG_TYPE_INFO, pojoTypeWithAnnotation.getTypeAt(0));
+        assertEquals(INT_TYPE_INFO, pojoTypeWithoutAnnotation.getTypeAt(0));
+
+        MapFunction<Boolean, WithFieldTypeInfoAnnotation<Boolean, String>> f =
+                new WithFieldTypeInfoAnnotationMapper<>();
+        TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, 
BOOLEAN_TYPE_INFO);
+        assertTrue(ti instanceof PojoTypeInfo);
+        PojoTypeInfo<?> tiPojo = (PojoTypeInfo<?>) ti;
+        // field outerEither
+        assertTrue(tiPojo.getTypeAt(1) instanceof EitherTypeInfo);
+        EitherTypeInfo eti = (EitherTypeInfo) tiPojo.getTypeAt(1);
+        assertEquals(BOOLEAN_TYPE_INFO, eti.getLeftType());
+        assertEquals(STRING_TYPE_INFO, eti.getRightType());
+        // field id: type info from field annotation that overrides the class 
annotation:
+        assertEquals(LONG_TYPE_INFO, tiPojo.getTypeAt(0));
+    }
+
     @Test(expected = InvalidTypesException.class)
     public void testMissingTypeInfo() {
         MapFunction f = new MyFaultyMapper();
@@ -484,4 +520,39 @@ public class TypeInfoFactoryTest {
             return (TypeInformation) INT_TYPE_INFO;
         }
     }
+
+    public static class IntLikeTypeInfoFactory2 extends 
TypeInfoFactory<IntLike> {
+        @Override
+        @SuppressWarnings("unchecked")
+        public TypeInformation<IntLike> createTypeInfo(
+                Type t, Map<String, TypeInformation<?>> genericParams) {
+            return (TypeInformation) LONG_TYPE_INFO;
+        }
+    }
+
+    // hypothesis:from out package not in the project
+    public static class OuterEither<A, B> {
+        // empty
+    }
+
+    public static class WithFieldTypeInfoAnnotation<A, B> {
+        @TypeInfo(MyEitherTypeInfoFactory.class)
+        public OuterEither<A, B> outerEither;
+
+        @TypeInfo(IntLikeTypeInfoFactory2.class)
+        public IntLike id;
+    }
+
+    public static class WithoutFieldTypeInfoAnnotation<A, B> {
+        public OuterEither<A, B> outerEither;
+        public IntLike id;
+    }
+
+    public static class WithFieldTypeInfoAnnotationMapper<T>
+            implements MapFunction<T, WithFieldTypeInfoAnnotation<T, String>> {
+        @Override
+        public WithFieldTypeInfoAnnotation<T, String> map(T value) throws 
Exception {
+            return null;
+        }
+    }
 }

Reply via email to