[
https://issues.apache.org/jira/browse/FLINK-11073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Tzu-Li (Gordon) Tai closed FLINK-11073.
---------------------------------------
Resolution: Fixed
Merged for 1.8.0: cad0509a752be57eb995a95ccebf00012e443ed5
> Make serializers immutable / provide option
> TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer
> ------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-11073
> URL: https://issues.apache.org/jira/browse/FLINK-11073
> Project: Flink
> Issue Type: Improvement
> Components: Type Serialization System
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Tzu-Li (Gordon) Tai
> Priority: Critical
> Labels: pull-request-available
> Fix For: 1.8.0
>
> Time Spent: 50m
> Remaining Estimate: 0h
>
> h2. Motivation
> Right now, when a new serializer is provided to the old serializer (or, to be
> more specific, the old serializer's snapshot) for state schema compatibility
> checks, if the new serializer is reconfigurable so that it may be compatible,
> the only possible way to do this is reconfigure the new serializer in-place
> and return {{TypeSerializerSchemaCompatibility.compatibleAsIs()}} as the
> result of the compatibility check.
> One solid example is the {{KryoSerializer}}. The {{KryoSerializer}} contains
> as configuration a map of serialized classes to their registered ids. This
> mapping may change on restore executions, and the new {{KryoSerializer}} must
> reconfigure this mapping to match with the previous execution before the new
> {{KryoSerializer}} can be used for state access.
> Right now, this is performed by directly mutating the map in the new
> serializer instance.
> This mutative behaviour is fragile, especially when taking into account scale
> down / up scenarios which could easily result in mismatching state serializer
> configurations across TMs.
> h2. Proposed Approach
> # The {{TypeSerializerSchemaCompatibility}} result class should be extended
> to contain an option
> {{compatibleWithReconfiguredSerializer(TypeSerializer)}}, which would wrap a
> new instance of a reconfigured version of the new serializer.
> # Callers of the compatibility check needs to be aware of this case and
> respect it, using the provided reconfigured serializer instance when one is
> provided. In Flink, there are two places which performs compatibility checks
> on serializers: 1) composite serializers which contain nested serializers,
> and therefore needs to check compatibility of its nested serializers, and 2)
> in state backends, checking the compatibility of the new serializer with the
> old serializer.
> # Introduce {{CompositeTypeSerializerSnapshot}} to encapsulate logic of
> handling reconfiguration of nested serializers: if a composite serializer has
> a nested serializer that returns a new reconfigured instance of itself, than
> the result of the compatibility check on the composite serializer should also
> wrap a reconfigured version of the composite serializer that holds the
> reconfigured nested serializer. This logic should be captured in a base
> abstract class, say {{CompositeTypeSerializerSnapshot}} so that it can be
> commonly shared by many of Flink's composite serializers.
> # For composite serializers that is still using the legacy, less-powerful
> {{TypeSerializerConfigSnapshot}} and {{CompatibilityResult}} abstractions,
> while its nested serializer is signaling that it has reconfigured itself,
> this should be detected and an error is thrown complaining that the outer
> composite serializer needs to be upgraded to use the new serializer snapshot
> and compatibility abstractions. This approach follows the same way we handled
> bridging the new {{TypeSerializerSchemaCompatibility}} and old
> {{CompatibilityResult}} class in Flink 1.7.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)