[ 
https://issues.apache.org/jira/browse/FLINK-26470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Antoine Michaud updated FLINK-26470:
------------------------------------
    Description: 
h2. Problem:
h4. Basic collections (List, Map) and custom types are not compatible with 
flink pojo serialization.

Here the exception:

 
{code:java}
Generic types have been disabled in the ExecutionConfig and type java.util.List 
is treated as a generic type.
java.lang.UnsupportedOperationException: Generic types have been disabled in 
the ExecutionConfig and type java.util.List is treated as a generic type.
    at 
org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:87)
    at 
org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:346)
[... nothing interesting ...]{code}
 
h2. Explanation:

Like docs said, we should not use kryo in production since it's not performant 
at all.

To stop using kryo, and use the native pojos serialization, we do this:
{code:java}
env.getConfig().disableGenericTypes(){code}
 

But pojos have to meet [some 
requirements|https://nightlies.apache.org/flink/flink-docs-release-1.10/dev/types_serialization.html#rules-for-pojo-types].

Regarding the following code coming from flink-core v1.13.2 (and looks the same 
in v1.14.4):
{code:java}
private <OUT, IN1, IN2> TypeInformation<OUT> privateGetForClass(
        Class<OUT> clazz,
        List<Type> typeHierarchy,
        ParameterizedType parameterizedType,
        TypeInformation<IN1> in1Type,
        TypeInformation<IN2> in2Type) {
    checkNotNull(clazz);

    // check if type information can be produced using a factory
    final TypeInformation<OUT> typeFromFactory =
            createTypeInfoFromFactory(clazz, typeHierarchy, in1Type, in2Type);
    if (typeFromFactory != null) {
        return typeFromFactory;
    }

    // Object is handled as generic type info
    if (clazz.equals(Object.class)) {
        return new GenericTypeInfo<>(clazz);
    }

    // Class is handled as generic type info
    if (clazz.equals(Class.class)) {
        return new GenericTypeInfo<>(clazz);
    }

    // recursive types are handled as generic type info
    if (countTypeInHierarchy(typeHierarchy, clazz) > 1) {
        return new GenericTypeInfo<>(clazz);
    }

    // check for arrays
    if (clazz.isArray()) {

        // primitive arrays: int[], byte[], ...
        PrimitiveArrayTypeInfo<OUT> primitiveArrayInfo =
                PrimitiveArrayTypeInfo.getInfoFor(clazz);
        if (primitiveArrayInfo != null) {
            return primitiveArrayInfo;
        }

        // basic type arrays: String[], Integer[], Double[]
        BasicArrayTypeInfo<OUT, ?> basicArrayInfo = 
BasicArrayTypeInfo.getInfoFor(clazz);
        if (basicArrayInfo != null) {
            return basicArrayInfo;
        }

        // object arrays
        else {
            TypeInformation<?> componentTypeInfo =
                    createTypeInfoWithTypeHierarchy(
                            typeHierarchy, clazz.getComponentType(), in1Type, 
in2Type);

            return ObjectArrayTypeInfo.getInfoFor(clazz, componentTypeInfo);
        }
    }

    // check for writable types
    if (isHadoopWritable(clazz)) {
        return createHadoopWritableTypeInfo(clazz);
    }

    // check for basic types
    TypeInformation<OUT> basicTypeInfo = BasicTypeInfo.getInfoFor(clazz);
    if (basicTypeInfo != null) {
        return basicTypeInfo;
    }

    // check for SQL time types
    TypeInformation<OUT> timeTypeInfo = SqlTimeTypeInfo.getInfoFor(clazz);
    if (timeTypeInfo != null) {
        return timeTypeInfo;
    }

    // check for subclasses of Value
    if (Value.class.isAssignableFrom(clazz)) {
        Class<? extends Value> valueClass = clazz.asSubclass(Value.class);
        return (TypeInformation<OUT>) 
ValueTypeInfo.getValueTypeInfo(valueClass);
    }

    // check for subclasses of Tuple
    if (Tuple.class.isAssignableFrom(clazz)) {
        if (clazz == Tuple0.class) {
            return new TupleTypeInfo(Tuple0.class);
        }
        throw new InvalidTypesException(
                "Type information extraction for tuples (except Tuple0) cannot 
be done based on the class.");
    }

    // check for Enums
    if (Enum.class.isAssignableFrom(clazz)) {
        return new EnumTypeInfo(clazz);
    }

    // special case for POJOs generated by Avro.
    if (hasSuperclass(clazz, AVRO_SPECIFIC_RECORD_BASE_CLASS)) {
        return AvroUtils.getAvroUtils().createAvroTypeInfo(clazz);
    }

    if (Modifier.isInterface(clazz.getModifiers())) {
        // Interface has no members and is therefore not handled as POJO
        return new GenericTypeInfo<>(clazz);
    }

    try {
        Type t = parameterizedType != null ? parameterizedType : clazz;
        TypeInformation<OUT> pojoType =
                analyzePojo(t, new ArrayList<>(typeHierarchy), in1Type, 
in2Type);
        if (pojoType != null) {
            return pojoType;
        }
    } catch (InvalidTypesException e) {
        if (LOG.isDebugEnabled()) {
            LOG.debug(
                    "Unable to handle type " + clazz + " as POJO. Message: " + 
e.getMessage(),
                    e);
        }
        // ignore and create generic type info
    }

    // return a generic type
    return new GenericTypeInfo<>(clazz);
} {code}
 

Only following types are compatible (e.g. not treated as GenericType):
 * All custom pojos with annotation @TypeInfo
 * arrays
 * All hadoop writable types
 * basic types (string, bigint/bigdecimal, instant/date, boxed primitives)
 * primitive types
 * sql time types
 * All implementing org.apache.flink.types.Value
 * All implementing org.apache.flink.api.java.tuple.Tuple
 * enums
 * Avro types
 * nested pojo

But not:
 * List, Map, since they are falling into 
`Modifier.isInterface(clazz.getModifiers())`
 * UUID, since it is treated as generic pojo (no getter/setter on all fields)

 
{quote}By the way, we can't register our custom serializer, that can really be 
the perfect world (@TypeInfo documentation says that there is 
TypeExtractor#registerFactory(Type, Class).. But there isn't)
{quote}
 

 
h3. How to fix it ?

There is already existing ListTypeInfo and MapTypeInfo, that can be simply used 
by the  method TypeExtractor.privateGetForClass(...).

For UUID, we can create a customisable TypeInformationFactory, that can 
contains all specific stuff that is not fitting the native flink libs. The 
other way is to add it as a BasicType.

 

_+I can help to contribute !+_

 

Thanks !

  was:
h2. Problem:
h4. Basic collections (List, Map) and custom types are not compatible with 
flink pojo serialization.
h2. Explanation:

Like docs said, we should not use kryo in production since it's not performant 
at all.

To stop using kryo, and use the native pojos serialization, we do this:
{code:java}
env.getConfig().disableGenericTypes(){code}
 

But pojos have to meet [some 
requirements|https://nightlies.apache.org/flink/flink-docs-release-1.10/dev/types_serialization.html#rules-for-pojo-types].

Regarding the following code coming from flink-core v1.13.2 (and looks the same 
in v1.14.4):
{code:java}
private <OUT, IN1, IN2> TypeInformation<OUT> privateGetForClass(
        Class<OUT> clazz,
        List<Type> typeHierarchy,
        ParameterizedType parameterizedType,
        TypeInformation<IN1> in1Type,
        TypeInformation<IN2> in2Type) {
    checkNotNull(clazz);

    // check if type information can be produced using a factory
    final TypeInformation<OUT> typeFromFactory =
            createTypeInfoFromFactory(clazz, typeHierarchy, in1Type, in2Type);
    if (typeFromFactory != null) {
        return typeFromFactory;
    }

    // Object is handled as generic type info
    if (clazz.equals(Object.class)) {
        return new GenericTypeInfo<>(clazz);
    }

    // Class is handled as generic type info
    if (clazz.equals(Class.class)) {
        return new GenericTypeInfo<>(clazz);
    }

    // recursive types are handled as generic type info
    if (countTypeInHierarchy(typeHierarchy, clazz) > 1) {
        return new GenericTypeInfo<>(clazz);
    }

    // check for arrays
    if (clazz.isArray()) {

        // primitive arrays: int[], byte[], ...
        PrimitiveArrayTypeInfo<OUT> primitiveArrayInfo =
                PrimitiveArrayTypeInfo.getInfoFor(clazz);
        if (primitiveArrayInfo != null) {
            return primitiveArrayInfo;
        }

        // basic type arrays: String[], Integer[], Double[]
        BasicArrayTypeInfo<OUT, ?> basicArrayInfo = 
BasicArrayTypeInfo.getInfoFor(clazz);
        if (basicArrayInfo != null) {
            return basicArrayInfo;
        }

        // object arrays
        else {
            TypeInformation<?> componentTypeInfo =
                    createTypeInfoWithTypeHierarchy(
                            typeHierarchy, clazz.getComponentType(), in1Type, 
in2Type);

            return ObjectArrayTypeInfo.getInfoFor(clazz, componentTypeInfo);
        }
    }

    // check for writable types
    if (isHadoopWritable(clazz)) {
        return createHadoopWritableTypeInfo(clazz);
    }

    // check for basic types
    TypeInformation<OUT> basicTypeInfo = BasicTypeInfo.getInfoFor(clazz);
    if (basicTypeInfo != null) {
        return basicTypeInfo;
    }

    // check for SQL time types
    TypeInformation<OUT> timeTypeInfo = SqlTimeTypeInfo.getInfoFor(clazz);
    if (timeTypeInfo != null) {
        return timeTypeInfo;
    }

    // check for subclasses of Value
    if (Value.class.isAssignableFrom(clazz)) {
        Class<? extends Value> valueClass = clazz.asSubclass(Value.class);
        return (TypeInformation<OUT>) 
ValueTypeInfo.getValueTypeInfo(valueClass);
    }

    // check for subclasses of Tuple
    if (Tuple.class.isAssignableFrom(clazz)) {
        if (clazz == Tuple0.class) {
            return new TupleTypeInfo(Tuple0.class);
        }
        throw new InvalidTypesException(
                "Type information extraction for tuples (except Tuple0) cannot 
be done based on the class.");
    }

    // check for Enums
    if (Enum.class.isAssignableFrom(clazz)) {
        return new EnumTypeInfo(clazz);
    }

    // special case for POJOs generated by Avro.
    if (hasSuperclass(clazz, AVRO_SPECIFIC_RECORD_BASE_CLASS)) {
        return AvroUtils.getAvroUtils().createAvroTypeInfo(clazz);
    }

    if (Modifier.isInterface(clazz.getModifiers())) {
        // Interface has no members and is therefore not handled as POJO
        return new GenericTypeInfo<>(clazz);
    }

    try {
        Type t = parameterizedType != null ? parameterizedType : clazz;
        TypeInformation<OUT> pojoType =
                analyzePojo(t, new ArrayList<>(typeHierarchy), in1Type, 
in2Type);
        if (pojoType != null) {
            return pojoType;
        }
    } catch (InvalidTypesException e) {
        if (LOG.isDebugEnabled()) {
            LOG.debug(
                    "Unable to handle type " + clazz + " as POJO. Message: " + 
e.getMessage(),
                    e);
        }
        // ignore and create generic type info
    }

    // return a generic type
    return new GenericTypeInfo<>(clazz);
} {code}
 

Only following types are compatible (e.g. not treated as GenericType):
 * All custom pojos with annotation @TypeInfo
 * arrays
 * All hadoop writable types
 * basic types (string, bigint/bigdecimal, instant/date, boxed primitives)
 * primitive types
 * sql time types
 * All implementing org.apache.flink.types.Value
 * All implementing org.apache.flink.api.java.tuple.Tuple
 * enums
 * Avro types
 * nested pojo

But not:
 * List, Map, since they are falling into 
`Modifier.isInterface(clazz.getModifiers())`
 * UUID, since it is treated as generic pojo (no getter/setter on all fields)

 
{quote}By the way, we can't register our custom serializer, that can really be 
the perfect world (@TypeInfo documentation says that there is 
TypeExtractor#registerFactory(Type, Class).. But there isn't)
{quote}
 

 
h3. How to fix it ?

There is already existing ListTypeInfo and MapTypeInfo, that can be simply used 
by the  method TypeExtractor.privateGetForClass(...).

For UUID, we can create a customisable TypeInformationFactory, that can 
contains all specific stuff that is not fitting the native flink libs. The 
other way is to add it as a BasicType.

 

_+I can help to contribute !+_

 

Thanks !


> [Java][TypeExtractor] Missing type information in POJO types of some types 
> (List, Map, UUID)
> --------------------------------------------------------------------------------------------
>
>                 Key: FLINK-26470
>                 URL: https://issues.apache.org/jira/browse/FLINK-26470
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Core
>    Affects Versions: 1.13.2
>            Reporter: Antoine Michaud
>            Priority: Minor
>              Labels: pojo
>
> h2. Problem:
> h4. Basic collections (List, Map) and custom types are not compatible with 
> flink pojo serialization.
> Here the exception:
>  
> {code:java}
> Generic types have been disabled in the ExecutionConfig and type 
> java.util.List is treated as a generic type.
> java.lang.UnsupportedOperationException: Generic types have been disabled in 
> the ExecutionConfig and type java.util.List is treated as a generic type.
>     at 
> org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:87)
>     at 
> org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:346)
> [... nothing interesting ...]{code}
>  
> h2. Explanation:
> Like docs said, we should not use kryo in production since it's not 
> performant at all.
> To stop using kryo, and use the native pojos serialization, we do this:
> {code:java}
> env.getConfig().disableGenericTypes(){code}
>  
> But pojos have to meet [some 
> requirements|https://nightlies.apache.org/flink/flink-docs-release-1.10/dev/types_serialization.html#rules-for-pojo-types].
> Regarding the following code coming from flink-core v1.13.2 (and looks the 
> same in v1.14.4):
> {code:java}
> private <OUT, IN1, IN2> TypeInformation<OUT> privateGetForClass(
>         Class<OUT> clazz,
>         List<Type> typeHierarchy,
>         ParameterizedType parameterizedType,
>         TypeInformation<IN1> in1Type,
>         TypeInformation<IN2> in2Type) {
>     checkNotNull(clazz);
>     // check if type information can be produced using a factory
>     final TypeInformation<OUT> typeFromFactory =
>             createTypeInfoFromFactory(clazz, typeHierarchy, in1Type, in2Type);
>     if (typeFromFactory != null) {
>         return typeFromFactory;
>     }
>     // Object is handled as generic type info
>     if (clazz.equals(Object.class)) {
>         return new GenericTypeInfo<>(clazz);
>     }
>     // Class is handled as generic type info
>     if (clazz.equals(Class.class)) {
>         return new GenericTypeInfo<>(clazz);
>     }
>     // recursive types are handled as generic type info
>     if (countTypeInHierarchy(typeHierarchy, clazz) > 1) {
>         return new GenericTypeInfo<>(clazz);
>     }
>     // check for arrays
>     if (clazz.isArray()) {
>         // primitive arrays: int[], byte[], ...
>         PrimitiveArrayTypeInfo<OUT> primitiveArrayInfo =
>                 PrimitiveArrayTypeInfo.getInfoFor(clazz);
>         if (primitiveArrayInfo != null) {
>             return primitiveArrayInfo;
>         }
>         // basic type arrays: String[], Integer[], Double[]
>         BasicArrayTypeInfo<OUT, ?> basicArrayInfo = 
> BasicArrayTypeInfo.getInfoFor(clazz);
>         if (basicArrayInfo != null) {
>             return basicArrayInfo;
>         }
>         // object arrays
>         else {
>             TypeInformation<?> componentTypeInfo =
>                     createTypeInfoWithTypeHierarchy(
>                             typeHierarchy, clazz.getComponentType(), in1Type, 
> in2Type);
>             return ObjectArrayTypeInfo.getInfoFor(clazz, componentTypeInfo);
>         }
>     }
>     // check for writable types
>     if (isHadoopWritable(clazz)) {
>         return createHadoopWritableTypeInfo(clazz);
>     }
>     // check for basic types
>     TypeInformation<OUT> basicTypeInfo = BasicTypeInfo.getInfoFor(clazz);
>     if (basicTypeInfo != null) {
>         return basicTypeInfo;
>     }
>     // check for SQL time types
>     TypeInformation<OUT> timeTypeInfo = SqlTimeTypeInfo.getInfoFor(clazz);
>     if (timeTypeInfo != null) {
>         return timeTypeInfo;
>     }
>     // check for subclasses of Value
>     if (Value.class.isAssignableFrom(clazz)) {
>         Class<? extends Value> valueClass = clazz.asSubclass(Value.class);
>         return (TypeInformation<OUT>) 
> ValueTypeInfo.getValueTypeInfo(valueClass);
>     }
>     // check for subclasses of Tuple
>     if (Tuple.class.isAssignableFrom(clazz)) {
>         if (clazz == Tuple0.class) {
>             return new TupleTypeInfo(Tuple0.class);
>         }
>         throw new InvalidTypesException(
>                 "Type information extraction for tuples (except Tuple0) 
> cannot be done based on the class.");
>     }
>     // check for Enums
>     if (Enum.class.isAssignableFrom(clazz)) {
>         return new EnumTypeInfo(clazz);
>     }
>     // special case for POJOs generated by Avro.
>     if (hasSuperclass(clazz, AVRO_SPECIFIC_RECORD_BASE_CLASS)) {
>         return AvroUtils.getAvroUtils().createAvroTypeInfo(clazz);
>     }
>     if (Modifier.isInterface(clazz.getModifiers())) {
>         // Interface has no members and is therefore not handled as POJO
>         return new GenericTypeInfo<>(clazz);
>     }
>     try {
>         Type t = parameterizedType != null ? parameterizedType : clazz;
>         TypeInformation<OUT> pojoType =
>                 analyzePojo(t, new ArrayList<>(typeHierarchy), in1Type, 
> in2Type);
>         if (pojoType != null) {
>             return pojoType;
>         }
>     } catch (InvalidTypesException e) {
>         if (LOG.isDebugEnabled()) {
>             LOG.debug(
>                     "Unable to handle type " + clazz + " as POJO. Message: " 
> + e.getMessage(),
>                     e);
>         }
>         // ignore and create generic type info
>     }
>     // return a generic type
>     return new GenericTypeInfo<>(clazz);
> } {code}
>  
> Only following types are compatible (e.g. not treated as GenericType):
>  * All custom pojos with annotation @TypeInfo
>  * arrays
>  * All hadoop writable types
>  * basic types (string, bigint/bigdecimal, instant/date, boxed primitives)
>  * primitive types
>  * sql time types
>  * All implementing org.apache.flink.types.Value
>  * All implementing org.apache.flink.api.java.tuple.Tuple
>  * enums
>  * Avro types
>  * nested pojo
> But not:
>  * List, Map, since they are falling into 
> `Modifier.isInterface(clazz.getModifiers())`
>  * UUID, since it is treated as generic pojo (no getter/setter on all fields)
>  
> {quote}By the way, we can't register our custom serializer, that can really 
> be the perfect world (@TypeInfo documentation says that there is 
> TypeExtractor#registerFactory(Type, Class).. But there isn't)
> {quote}
>  
>  
> h3. How to fix it ?
> There is already existing ListTypeInfo and MapTypeInfo, that can be simply 
> used by the  method TypeExtractor.privateGetForClass(...).
> For UUID, we can create a customisable TypeInformationFactory, that can 
> contains all specific stuff that is not fitting the native flink libs. The 
> other way is to add it as a BasicType.
>  
> _+I can help to contribute !+_
>  
> Thanks !



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to