[
https://issues.apache.org/jira/browse/FLINK-11372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Tzu-Li (Gordon) Tai updated FLINK-11372:
----------------------------------------
Description:
In {{CollectionSerializerConfigSnapshot}}:
{code}
@Override
public TypeSerializerSchemaCompatibility<C>
resolveSchemaCompatibility(TypeSerializer<C> newSerializer) {
if (newSerializer instanceof ListSerializer) {
ListSerializer<T> newListSerializer = (ListSerializer<T>)
newSerializer;
ListSerializerSnapshot<T> listSerializerSnapshot = new
ListSerializerSnapshot<>(newListSerializer);
@SuppressWarnings("unchecked")
TypeSerializerSchemaCompatibility<C> result =
(TypeSerializerSchemaCompatibility<C>)
listSerializerSnapshot.resolveSchemaCompatibility(newListSerializer);
return result;
} else {
return super.resolveSchemaCompatibility(newSerializer);
}
}
{code}
Compatibility check of {{ListSerializer}} is delegated to the new list
serializer snapshot class, {{ListSerializerSnapshot}}.
However, it is incorrect to let the delegate wrap the new serializer (and
therefore the new nested element serializer). By doing that, we're essentially
checking compatibility of the new serializer with itself, whereas it should be
checking compatibility with the restored serializer.
was:
In {{CollectionSerializerConfigSnapshot}}:
{code}
@Override
public TypeSerializerSchemaCompatibility<C>
resolveSchemaCompatibility(TypeSerializer<C> newSerializer) {
if (newSerializer instanceof ListSerializer) {
ListSerializer<T> newListSerializer =
(ListSerializer<T>) newSerializer;
ListSerializerSnapshot<T> listSerializerSnapshot = new
ListSerializerSnapshot<>(newListSerializer);
@SuppressWarnings("unchecked")
TypeSerializerSchemaCompatibility<C> result =
(TypeSerializerSchemaCompatibility<C>)
listSerializerSnapshot.resolveSchemaCompatibility(newListSerializer);
return result;
} else {
return super.resolveSchemaCompatibility(newSerializer);
}
}
{code}
Compatibility check of {{ListSerializer}} is delegated to the new list
serializer snapshot class, {{ListSerializerSnapshot}}.
However, it is incorrect to let the delegate wrap the new serializer (and
therefore the new nested element serializer). By doing that, we're essentially
checking compatibility of the new serializer with itself, whereas it should be
checking compatibility with the restored serializer.
> Incorrect delegation of compatibility checks to new snapshots in
> CollectionSerializerConfigSnapshot
> ---------------------------------------------------------------------------------------------------
>
> Key: FLINK-11372
> URL: https://issues.apache.org/jira/browse/FLINK-11372
> Project: Flink
> Issue Type: Bug
> Components: Type Serialization System
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Tzu-Li (Gordon) Tai
> Priority: Major
> Fix For: 1.8.0
>
>
> In {{CollectionSerializerConfigSnapshot}}:
> {code}
> @Override
> public TypeSerializerSchemaCompatibility<C>
> resolveSchemaCompatibility(TypeSerializer<C> newSerializer) {
> if (newSerializer instanceof ListSerializer) {
> ListSerializer<T> newListSerializer = (ListSerializer<T>)
> newSerializer;
> ListSerializerSnapshot<T> listSerializerSnapshot = new
> ListSerializerSnapshot<>(newListSerializer);
> @SuppressWarnings("unchecked")
> TypeSerializerSchemaCompatibility<C> result =
> (TypeSerializerSchemaCompatibility<C>)
>
> listSerializerSnapshot.resolveSchemaCompatibility(newListSerializer);
> return result;
> } else {
> return super.resolveSchemaCompatibility(newSerializer);
> }
> }
> {code}
> Compatibility check of {{ListSerializer}} is delegated to the new list
> serializer snapshot class, {{ListSerializerSnapshot}}.
> However, it is incorrect to let the delegate wrap the new serializer (and
> therefore the new nested element serializer). By doing that, we're
> essentially checking compatibility of the new serializer with itself, whereas
> it should be checking compatibility with the restored serializer.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)