[ 
https://issues.apache.org/jira/browse/FLINK-11073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16709761#comment-16709761
 ] 

ASF GitHub Bot commented on FLINK-11073:
----------------------------------------

tzulitai opened a new pull request #7239: [FLINK-11073] [core] Allow immutable 
serializers
URL: https://github.com/apache/flink/pull/7239
 
 
   ## What is the purpose of the change
   
   See JIRA [FLINK-11073](https://issues.apache.org/jira/browse/FLINK-11073) 
for a detailed description of the motivation for this PR.
   
   ## Brief change log
   
   - Introduce 
`TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(TypeSerializer)`
 as a new option for compatibility result checks.
   - Introduce `CompositeTypeSerializerSnapshot` class. This encapsulates logic 
for handling
   writing, reading, and deriving final compatibility results for composite
   serializers that have multiple nested serializers. It also respects the new 
`TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(TypeSerializer)`
 case.
   - Let all serializer snapshots that were using the old 
`CompositeSerializerSnapshot` class to extend 
    the new `CompositeTypeSerializerSnapshot`.
   - Rename `CompositeSerializerSnapshot` to 
`NestedSerializersSnapshotDelegate` to better convey its purpose after the 
rework. The class is also now annotated as final, as users should be using the 
`CompositeTypeSerializerSnapshot` class instead.
   
   ## Verifying this change
   
   All corresponding `XXSerializerSnapshotMigrationTest` of the touched 
serializer snapshots should all still be passing.
   
   ## Does this pull request potentially affect one of the following parts:
   
   This **does** affect one `PublicEvolving` API, the 
`CompositeSerializerSnapshot`.
   It has been renamed and now made `Internal`.
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**yes** / no)
     - The serializers: (**yes** / no / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Make serializers immutable / provide option 
> TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer
> ------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-11073
>                 URL: https://issues.apache.org/jira/browse/FLINK-11073
>             Project: Flink
>          Issue Type: Improvement
>          Components: Type Serialization System
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.8.0
>
>
> h2. Motivation
> Right now, when a new serializer is provided to the old serializer (or, to be 
> more specific, the old serializer's snapshot) for state schema compatibility 
> checks, if the new serializer is reconfigurable so that it may be compatible, 
> the only possible way to do this is reconfigure the new serializer in-place 
> and return {{TypeSerializerSchemaCompatibility.compatibleAsIs()}} as the 
> result of the compatibility check.
> One solid example is the {{KryoSerializer}}. The {{KryoSerializer}} contains 
> as configuration a map of serialized classes to their registered ids. This 
> mapping may change on restore executions, and the new {{KryoSerializer}} must 
> reconfigure this mapping to match with the previous execution before the new 
> {{KryoSerializer}} can be used for state access.
> Right now, this is performed by directly mutating the map in the new 
> serializer instance.
> This mutative behaviour is fragile, especially when taking into account scale 
> down / up scenarios which could easily result in mismatching state serializer 
> configurations across TMs.
> h2. Proposed Approach
>  # The {{TypeSerializerSchemaCompatibility}} result class should be extended 
> to contain an option 
> {{compatibleWithReconfiguredSerializer(TypeSerializer)}}, which would wrap a 
> new instance of a reconfigured version of the new serializer.
>  # Callers of the compatibility check needs to be aware of this case and 
> respect it, using the provided reconfigured serializer instance when one is 
> provided. In Flink, there are two places which performs compatibility checks 
> on serializers: 1) composite serializers which contain nested serializers, 
> and therefore needs to check compatibility of its nested serializers, and 2) 
> in state backends, checking the compatibility of the new serializer with the 
> old serializer.
>  # Introduce {{CompositeTypeSerializerSnapshot}} to encapsulate logic of 
> handling reconfiguration of nested serializers: if a composite serializer has 
> a nested serializer that returns a new reconfigured instance of itself, than 
> the result of the compatibility check on the composite serializer should also 
> wrap a reconfigured version of the composite serializer that holds the 
> reconfigured nested serializer. This logic should be captured in a base 
> abstract class, say {{CompositeTypeSerializerSnapshot}} so that it can be 
> commonly shared by many of Flink's composite serializers.
>  # For composite serializers that is still using the legacy, less-powerful 
> {{TypeSerializerConfigSnapshot}} and {{CompatibilityResult}} abstractions, 
> while its nested serializer is signaling that it has reconfigured itself, 
> this should be detected and an error is thrown complaining that the outer 
> composite serializer needs to be upgraded to use the new serializer snapshot 
> and compatibility abstractions. This approach follows the same way we handled 
> bridging the new {{TypeSerializerSchemaCompatibility}} and old 
> {{CompatibilityResult}} class in Flink 1.7.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to