[
https://issues.apache.org/jira/browse/FLINK-26470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Antoine Michaud updated FLINK-26470:
------------------------------------
Description:
h2. Problem:
h4. Basic collections (List, Map) and custom types are not compatible with
flink pojo serialization.
Here the exception:
{code:java}
Generic types have been disabled in the ExecutionConfig and type java.util.List
is treated as a generic type.
java.lang.UnsupportedOperationException: Generic types have been disabled in
the ExecutionConfig and type java.util.List is treated as a generic type.
at
org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:87)
at
org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:346)
[... nothing interesting ...]{code}
h2. Explanation:
Like docs said, we should not use kryo in production since it's not performant
at all.
To stop using kryo, and use the native pojos serialization, we do this:
{code:java}
env.getConfig().disableGenericTypes(){code}
But pojos have to meet [some
requirements|https://nightlies.apache.org/flink/flink-docs-release-1.10/dev/types_serialization.html#rules-for-pojo-types].
Regarding the following code coming from flink-core v1.13.2 (and looks the same
in v1.14.4):
{code:java}
private <OUT, IN1, IN2> TypeInformation<OUT> privateGetForClass(
Class<OUT> clazz,
List<Type> typeHierarchy,
ParameterizedType parameterizedType,
TypeInformation<IN1> in1Type,
TypeInformation<IN2> in2Type) {
checkNotNull(clazz);
// check if type information can be produced using a factory
final TypeInformation<OUT> typeFromFactory =
createTypeInfoFromFactory(clazz, typeHierarchy, in1Type, in2Type);
if (typeFromFactory != null) {
return typeFromFactory;
}
// Object is handled as generic type info
if (clazz.equals(Object.class)) {
return new GenericTypeInfo<>(clazz);
}
// Class is handled as generic type info
if (clazz.equals(Class.class)) {
return new GenericTypeInfo<>(clazz);
}
// recursive types are handled as generic type info
if (countTypeInHierarchy(typeHierarchy, clazz) > 1) {
return new GenericTypeInfo<>(clazz);
}
// check for arrays
if (clazz.isArray()) {
// primitive arrays: int[], byte[], ...
PrimitiveArrayTypeInfo<OUT> primitiveArrayInfo =
PrimitiveArrayTypeInfo.getInfoFor(clazz);
if (primitiveArrayInfo != null) {
return primitiveArrayInfo;
}
// basic type arrays: String[], Integer[], Double[]
BasicArrayTypeInfo<OUT, ?> basicArrayInfo =
BasicArrayTypeInfo.getInfoFor(clazz);
if (basicArrayInfo != null) {
return basicArrayInfo;
}
// object arrays
else {
TypeInformation<?> componentTypeInfo =
createTypeInfoWithTypeHierarchy(
typeHierarchy, clazz.getComponentType(), in1Type,
in2Type);
return ObjectArrayTypeInfo.getInfoFor(clazz, componentTypeInfo);
}
}
// check for writable types
if (isHadoopWritable(clazz)) {
return createHadoopWritableTypeInfo(clazz);
}
// check for basic types
TypeInformation<OUT> basicTypeInfo = BasicTypeInfo.getInfoFor(clazz);
if (basicTypeInfo != null) {
return basicTypeInfo;
}
// check for SQL time types
TypeInformation<OUT> timeTypeInfo = SqlTimeTypeInfo.getInfoFor(clazz);
if (timeTypeInfo != null) {
return timeTypeInfo;
}
// check for subclasses of Value
if (Value.class.isAssignableFrom(clazz)) {
Class<? extends Value> valueClass = clazz.asSubclass(Value.class);
return (TypeInformation<OUT>)
ValueTypeInfo.getValueTypeInfo(valueClass);
}
// check for subclasses of Tuple
if (Tuple.class.isAssignableFrom(clazz)) {
if (clazz == Tuple0.class) {
return new TupleTypeInfo(Tuple0.class);
}
throw new InvalidTypesException(
"Type information extraction for tuples (except Tuple0) cannot
be done based on the class.");
}
// check for Enums
if (Enum.class.isAssignableFrom(clazz)) {
return new EnumTypeInfo(clazz);
}
// special case for POJOs generated by Avro.
if (hasSuperclass(clazz, AVRO_SPECIFIC_RECORD_BASE_CLASS)) {
return AvroUtils.getAvroUtils().createAvroTypeInfo(clazz);
}
if (Modifier.isInterface(clazz.getModifiers())) {
// Interface has no members and is therefore not handled as POJO
return new GenericTypeInfo<>(clazz);
}
try {
Type t = parameterizedType != null ? parameterizedType : clazz;
TypeInformation<OUT> pojoType =
analyzePojo(t, new ArrayList<>(typeHierarchy), in1Type,
in2Type);
if (pojoType != null) {
return pojoType;
}
} catch (InvalidTypesException e) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Unable to handle type " + clazz + " as POJO. Message: " +
e.getMessage(),
e);
}
// ignore and create generic type info
}
// return a generic type
return new GenericTypeInfo<>(clazz);
} {code}
Only following types are compatible (e.g. not treated as GenericType):
* All custom pojos with annotation @TypeInfo
* arrays
* All hadoop writable types
* basic types (string, bigint/bigdecimal, instant/date, boxed primitives)
* primitive types
* sql time types
* All implementing org.apache.flink.types.Value
* All implementing org.apache.flink.api.java.tuple.Tuple
* enums
* Avro types
* nested pojo
But not:
* List, Map, since they are falling into
`Modifier.isInterface(clazz.getModifiers())`
* UUID, since it is treated as generic pojo (no getter/setter on all fields)
{quote}By the way, we can't register our custom serializer, that can really be
the perfect world (@TypeInfo documentation says that there is
TypeExtractor#registerFactory(Type, Class).. But there isn't)
{quote}
h3. How to fix it ?
There is already existing ListTypeInfo and MapTypeInfo, that can be simply used
by the method TypeExtractor.privateGetForClass(...).
For UUID, we can create a customisable TypeInformationFactory register, that
can contains all specific stuff that is not fitting the native flink libs. The
other way is to add it as a BasicType.
_+I can help to contribute !+_
Thanks !
was:
h2. Problem:
h4. Basic collections (List, Map) and custom types are not compatible with
flink pojo serialization.
Here the exception:
{code:java}
Generic types have been disabled in the ExecutionConfig and type java.util.List
is treated as a generic type.
java.lang.UnsupportedOperationException: Generic types have been disabled in
the ExecutionConfig and type java.util.List is treated as a generic type.
at
org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:87)
at
org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:346)
[... nothing interesting ...]{code}
h2. Explanation:
Like docs said, we should not use kryo in production since it's not performant
at all.
To stop using kryo, and use the native pojos serialization, we do this:
{code:java}
env.getConfig().disableGenericTypes(){code}
But pojos have to meet [some
requirements|https://nightlies.apache.org/flink/flink-docs-release-1.10/dev/types_serialization.html#rules-for-pojo-types].
Regarding the following code coming from flink-core v1.13.2 (and looks the same
in v1.14.4):
{code:java}
private <OUT, IN1, IN2> TypeInformation<OUT> privateGetForClass(
Class<OUT> clazz,
List<Type> typeHierarchy,
ParameterizedType parameterizedType,
TypeInformation<IN1> in1Type,
TypeInformation<IN2> in2Type) {
checkNotNull(clazz);
// check if type information can be produced using a factory
final TypeInformation<OUT> typeFromFactory =
createTypeInfoFromFactory(clazz, typeHierarchy, in1Type, in2Type);
if (typeFromFactory != null) {
return typeFromFactory;
}
// Object is handled as generic type info
if (clazz.equals(Object.class)) {
return new GenericTypeInfo<>(clazz);
}
// Class is handled as generic type info
if (clazz.equals(Class.class)) {
return new GenericTypeInfo<>(clazz);
}
// recursive types are handled as generic type info
if (countTypeInHierarchy(typeHierarchy, clazz) > 1) {
return new GenericTypeInfo<>(clazz);
}
// check for arrays
if (clazz.isArray()) {
// primitive arrays: int[], byte[], ...
PrimitiveArrayTypeInfo<OUT> primitiveArrayInfo =
PrimitiveArrayTypeInfo.getInfoFor(clazz);
if (primitiveArrayInfo != null) {
return primitiveArrayInfo;
}
// basic type arrays: String[], Integer[], Double[]
BasicArrayTypeInfo<OUT, ?> basicArrayInfo =
BasicArrayTypeInfo.getInfoFor(clazz);
if (basicArrayInfo != null) {
return basicArrayInfo;
}
// object arrays
else {
TypeInformation<?> componentTypeInfo =
createTypeInfoWithTypeHierarchy(
typeHierarchy, clazz.getComponentType(), in1Type,
in2Type);
return ObjectArrayTypeInfo.getInfoFor(clazz, componentTypeInfo);
}
}
// check for writable types
if (isHadoopWritable(clazz)) {
return createHadoopWritableTypeInfo(clazz);
}
// check for basic types
TypeInformation<OUT> basicTypeInfo = BasicTypeInfo.getInfoFor(clazz);
if (basicTypeInfo != null) {
return basicTypeInfo;
}
// check for SQL time types
TypeInformation<OUT> timeTypeInfo = SqlTimeTypeInfo.getInfoFor(clazz);
if (timeTypeInfo != null) {
return timeTypeInfo;
}
// check for subclasses of Value
if (Value.class.isAssignableFrom(clazz)) {
Class<? extends Value> valueClass = clazz.asSubclass(Value.class);
return (TypeInformation<OUT>)
ValueTypeInfo.getValueTypeInfo(valueClass);
}
// check for subclasses of Tuple
if (Tuple.class.isAssignableFrom(clazz)) {
if (clazz == Tuple0.class) {
return new TupleTypeInfo(Tuple0.class);
}
throw new InvalidTypesException(
"Type information extraction for tuples (except Tuple0) cannot
be done based on the class.");
}
// check for Enums
if (Enum.class.isAssignableFrom(clazz)) {
return new EnumTypeInfo(clazz);
}
// special case for POJOs generated by Avro.
if (hasSuperclass(clazz, AVRO_SPECIFIC_RECORD_BASE_CLASS)) {
return AvroUtils.getAvroUtils().createAvroTypeInfo(clazz);
}
if (Modifier.isInterface(clazz.getModifiers())) {
// Interface has no members and is therefore not handled as POJO
return new GenericTypeInfo<>(clazz);
}
try {
Type t = parameterizedType != null ? parameterizedType : clazz;
TypeInformation<OUT> pojoType =
analyzePojo(t, new ArrayList<>(typeHierarchy), in1Type,
in2Type);
if (pojoType != null) {
return pojoType;
}
} catch (InvalidTypesException e) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Unable to handle type " + clazz + " as POJO. Message: " +
e.getMessage(),
e);
}
// ignore and create generic type info
}
// return a generic type
return new GenericTypeInfo<>(clazz);
} {code}
Only following types are compatible (e.g. not treated as GenericType):
* All custom pojos with annotation @TypeInfo
* arrays
* All hadoop writable types
* basic types (string, bigint/bigdecimal, instant/date, boxed primitives)
* primitive types
* sql time types
* All implementing org.apache.flink.types.Value
* All implementing org.apache.flink.api.java.tuple.Tuple
* enums
* Avro types
* nested pojo
But not:
* List, Map, since they are falling into
`Modifier.isInterface(clazz.getModifiers())`
* UUID, since it is treated as generic pojo (no getter/setter on all fields)
{quote}By the way, we can't register our custom serializer, that can really be
the perfect world (@TypeInfo documentation says that there is
TypeExtractor#registerFactory(Type, Class).. But there isn't)
{quote}
h3. How to fix it ?
There is already existing ListTypeInfo and MapTypeInfo, that can be simply used
by the method TypeExtractor.privateGetForClass(...).
For UUID, we can create a customisable TypeInformationFactory, that can
contains all specific stuff that is not fitting the native flink libs. The
other way is to add it as a BasicType.
_+I can help to contribute !+_
Thanks !
> [Java][TypeExtractor] Missing type information in POJO types of some types
> (List, Map, UUID)
> --------------------------------------------------------------------------------------------
>
> Key: FLINK-26470
> URL: https://issues.apache.org/jira/browse/FLINK-26470
> Project: Flink
> Issue Type: Bug
> Components: API / Core
> Affects Versions: 1.13.2
> Reporter: Antoine Michaud
> Priority: Minor
> Labels: pojo
>
> h2. Problem:
> h4. Basic collections (List, Map) and custom types are not compatible with
> flink pojo serialization.
> Here the exception:
>
> {code:java}
> Generic types have been disabled in the ExecutionConfig and type
> java.util.List is treated as a generic type.
> java.lang.UnsupportedOperationException: Generic types have been disabled in
> the ExecutionConfig and type java.util.List is treated as a generic type.
> at
> org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:87)
> at
> org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:346)
> [... nothing interesting ...]{code}
>
> h2. Explanation:
> Like docs said, we should not use kryo in production since it's not
> performant at all.
> To stop using kryo, and use the native pojos serialization, we do this:
> {code:java}
> env.getConfig().disableGenericTypes(){code}
>
> But pojos have to meet [some
> requirements|https://nightlies.apache.org/flink/flink-docs-release-1.10/dev/types_serialization.html#rules-for-pojo-types].
> Regarding the following code coming from flink-core v1.13.2 (and looks the
> same in v1.14.4):
> {code:java}
> private <OUT, IN1, IN2> TypeInformation<OUT> privateGetForClass(
> Class<OUT> clazz,
> List<Type> typeHierarchy,
> ParameterizedType parameterizedType,
> TypeInformation<IN1> in1Type,
> TypeInformation<IN2> in2Type) {
> checkNotNull(clazz);
> // check if type information can be produced using a factory
> final TypeInformation<OUT> typeFromFactory =
> createTypeInfoFromFactory(clazz, typeHierarchy, in1Type, in2Type);
> if (typeFromFactory != null) {
> return typeFromFactory;
> }
> // Object is handled as generic type info
> if (clazz.equals(Object.class)) {
> return new GenericTypeInfo<>(clazz);
> }
> // Class is handled as generic type info
> if (clazz.equals(Class.class)) {
> return new GenericTypeInfo<>(clazz);
> }
> // recursive types are handled as generic type info
> if (countTypeInHierarchy(typeHierarchy, clazz) > 1) {
> return new GenericTypeInfo<>(clazz);
> }
> // check for arrays
> if (clazz.isArray()) {
> // primitive arrays: int[], byte[], ...
> PrimitiveArrayTypeInfo<OUT> primitiveArrayInfo =
> PrimitiveArrayTypeInfo.getInfoFor(clazz);
> if (primitiveArrayInfo != null) {
> return primitiveArrayInfo;
> }
> // basic type arrays: String[], Integer[], Double[]
> BasicArrayTypeInfo<OUT, ?> basicArrayInfo =
> BasicArrayTypeInfo.getInfoFor(clazz);
> if (basicArrayInfo != null) {
> return basicArrayInfo;
> }
> // object arrays
> else {
> TypeInformation<?> componentTypeInfo =
> createTypeInfoWithTypeHierarchy(
> typeHierarchy, clazz.getComponentType(), in1Type,
> in2Type);
> return ObjectArrayTypeInfo.getInfoFor(clazz, componentTypeInfo);
> }
> }
> // check for writable types
> if (isHadoopWritable(clazz)) {
> return createHadoopWritableTypeInfo(clazz);
> }
> // check for basic types
> TypeInformation<OUT> basicTypeInfo = BasicTypeInfo.getInfoFor(clazz);
> if (basicTypeInfo != null) {
> return basicTypeInfo;
> }
> // check for SQL time types
> TypeInformation<OUT> timeTypeInfo = SqlTimeTypeInfo.getInfoFor(clazz);
> if (timeTypeInfo != null) {
> return timeTypeInfo;
> }
> // check for subclasses of Value
> if (Value.class.isAssignableFrom(clazz)) {
> Class<? extends Value> valueClass = clazz.asSubclass(Value.class);
> return (TypeInformation<OUT>)
> ValueTypeInfo.getValueTypeInfo(valueClass);
> }
> // check for subclasses of Tuple
> if (Tuple.class.isAssignableFrom(clazz)) {
> if (clazz == Tuple0.class) {
> return new TupleTypeInfo(Tuple0.class);
> }
> throw new InvalidTypesException(
> "Type information extraction for tuples (except Tuple0)
> cannot be done based on the class.");
> }
> // check for Enums
> if (Enum.class.isAssignableFrom(clazz)) {
> return new EnumTypeInfo(clazz);
> }
> // special case for POJOs generated by Avro.
> if (hasSuperclass(clazz, AVRO_SPECIFIC_RECORD_BASE_CLASS)) {
> return AvroUtils.getAvroUtils().createAvroTypeInfo(clazz);
> }
> if (Modifier.isInterface(clazz.getModifiers())) {
> // Interface has no members and is therefore not handled as POJO
> return new GenericTypeInfo<>(clazz);
> }
> try {
> Type t = parameterizedType != null ? parameterizedType : clazz;
> TypeInformation<OUT> pojoType =
> analyzePojo(t, new ArrayList<>(typeHierarchy), in1Type,
> in2Type);
> if (pojoType != null) {
> return pojoType;
> }
> } catch (InvalidTypesException e) {
> if (LOG.isDebugEnabled()) {
> LOG.debug(
> "Unable to handle type " + clazz + " as POJO. Message: "
> + e.getMessage(),
> e);
> }
> // ignore and create generic type info
> }
> // return a generic type
> return new GenericTypeInfo<>(clazz);
> } {code}
>
> Only following types are compatible (e.g. not treated as GenericType):
> * All custom pojos with annotation @TypeInfo
> * arrays
> * All hadoop writable types
> * basic types (string, bigint/bigdecimal, instant/date, boxed primitives)
> * primitive types
> * sql time types
> * All implementing org.apache.flink.types.Value
> * All implementing org.apache.flink.api.java.tuple.Tuple
> * enums
> * Avro types
> * nested pojo
> But not:
> * List, Map, since they are falling into
> `Modifier.isInterface(clazz.getModifiers())`
> * UUID, since it is treated as generic pojo (no getter/setter on all fields)
>
> {quote}By the way, we can't register our custom serializer, that can really
> be the perfect world (@TypeInfo documentation says that there is
> TypeExtractor#registerFactory(Type, Class).. But there isn't)
> {quote}
>
>
> h3. How to fix it ?
> There is already existing ListTypeInfo and MapTypeInfo, that can be simply
> used by the method TypeExtractor.privateGetForClass(...).
> For UUID, we can create a customisable TypeInformationFactory register, that
> can contains all specific stuff that is not fitting the native flink libs.
> The other way is to add it as a BasicType.
>
> _+I can help to contribute !+_
>
> Thanks !
--
This message was sent by Atlassian Jira
(v8.20.1#820001)