[ 
https://issues.apache.org/jira/browse/FLINK-6190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15991577#comment-15991577
 ] 

ASF GitHub Bot commented on FLINK-6190:
---------------------------------------

GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/3804

    [FLINK-6190] [core] Reconfigurable TypeSerializers

    Reconfigurable TypeSerializers is the first step towards allowing 
serializer upgrades. The second step, activating serializer upgrades for 
registered state, will be a separate follow-up PR based on this one.
    
    This PR includes both 
[FLINK-6190](https://issues.apache.org/jira/browse/FLINK-6190) (configuration 
snapshot for all serializers) and 
[FLINK-6191](https://issues.apache.org/jira/browse/FLINK-6191) (reconfiguration 
logic for all serializers). They have been bundled together in this PR because 
they share common concerns.
    
    Since the change touches all serializers currently in Flink, to ease 
review, the PR is broken up into incrementally buildable commits that are 
independent for different kinds of serializers.
    
    ## Description
    
    Reconfigurable serializers consist of 2 parts:
    1. Creating a snapshot of a serializer's configuration, that can be written 
to state with definable versioned format. A serializer's configuration snapshot 
is a point-in-time view of the current state and parameters of the serializer 
at the time of the snapshot. For example, for the `KryoSerializer`, its 
configuration should contain the registration order of its classes and 
serializers. Other serializers like the `PojoSerializer` contain configuration 
parameters that change during runtime, e.g. the cached serializers for its 
non-registered POJO subclasses (the explanation of why certain information 
needs to be persisted for each serializer's configuration snapshot is included 
as code comments and Javadocs in the PR). 
    
    2. Reconfiguring a new serializer with the configuration snapshot of a 
preceding serializer, so that it may be compatible to read old data written by 
the preceding serializer.
    
    ## New user interfaces
    
    New methods in the `TypeSerializer` interface:
    - `TypeSerializer#snapshotConfiguration()`: extracts a configuration 
snapshot from the serializer
    - `TypeSerializer#reconfigure(TypeSerializerConfigSnapshot)`: reconfigure 
the serializer with its preceding serializer's configuration snapshot.
    
    New classes:
    - `TypeSerializerConfigSnapshot`: a `VersionedIOReadableWritable` base 
class for serializers to implement their own configuration snapshot class. Each 
serializer needs to encode its own information about its serialization format 
and its required parameters within its own config snapshot class.
    - `ForwardCompatibleSerializationFormatConfig`: a special marker config 
that implementations of `TypeSerializer#snapshotConfiguration()` can return to 
signal that new serializers for the data written by the serializer does not 
need to be reconfigured for compatibility. For example, user custom serializers 
using serialization frameworks with built-in compatibility mechanisms (e.g. 
Thrift, Protobut etc.) can simply just return this marker.
    - `ReconfigureResult`: enum representing the result of a reconfiguration, 
returned from `TypeSerializer#reconfigure(TypeSerializerConfigSnapshot)`. 3 
kinds of results are currently defined - please see the Javadoc of the class 
for details.
    
    ## Tests
    
    Specific tests have been added for the more complex serializers, namely:
    - `EnumSerializer` (841298a)
    - `KryoSerializer` (5b50505)
    - `PojoSerializer` (7ca5496)
    
    General `TypeSerializerConfigSnapshot` tests such as de-/serialization of 
configuration snapshots is added in c2bb1ef.
    
    Also, two fundamental unit tests related to config snapshots have also been 
added to `SerializerTestBase` (b917941).


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-6190

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3804.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3804
    
----

----


> Write "Serializer Configurations" metainfo along with state
> -----------------------------------------------------------
>
>                 Key: FLINK-6190
>                 URL: https://issues.apache.org/jira/browse/FLINK-6190
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing, Type Serialization System
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>
> In order for serializers to be able to be reconfigured on restore, we need 
> knowledge of the previous serializer configurations, e.g. what types were 
> registered, with which specific / default serializers, and especially for 
> Kryo, the order they were registered.
> For this, we will need serializer configuration metainfo to be self-contained 
> within the written state.
> For the implementation, we propose to include the following changes:
> - Have a new separate {{SerializersConfig}} class that is extracted from 
> {{ExecutionConfig}}. This new class should contain only the 
> serializer-related configurations (e.g., {{registeredKryoTypes}}, 
> {{registeredPojoTypes}}, etc.). The {{SerializersConfig}} class should only 
> be internally used, and therefore annotated with {{Internal}}. Users should 
> still use the {{ExecutionConfig}} to configure serializers.
> - For serializers that previously require a {{ExecutionConfig}} in 
> constructors, try changing them to take a {{SerializersConfig}} instead.
> - Introduce {{SerializersConfigSerializationProxy}}, which is in charge of 
> serializing the current {{SerializersConfig}} when writing state to streams. 
> This proxy defines the the serialized format of serializer configurations, 
> therefore should  be a {{VersionedIOReadableWritable}} as we may change the 
> format / information written in the future.
> - Add {{SerializersConfigSerializationProxy}} into state backends 
> serialization proxies (e.g. {{KeyedBackendSerializationProxy}}) so that the 
> serializer configuration is written into state. Need to additionally make 
> sure backwards compatibility of previous-version backend serialization 
> proxies.
> For the initial version, we propose to include the following within the 
> written serialization config metadata (ordered):
> 1. {{registeredPojoTypes}}
> 2. {{Throwable.class}} --> 
> {{org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer}} default 
> Flink-specific registry for serializing throwables.
> 3. {{defaultKryoSerializers}}
> 4. {{defaultKryoSerializerClasses}}
> 5. Kryo registrations for all primitive types (and boxed versions). This is 
> to allow compatibility in case the built-in registrations for the primitive 
> types change in Kryo in the future.
> 6. {{registeredTypesWithKryoSerializers}}
> 7. {{registeredTypesWithKryoSerializerClasses}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to