[
https://issues.apache.org/jira/browse/FLINK-6190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15991577#comment-15991577
]
ASF GitHub Bot commented on FLINK-6190:
---------------------------------------
GitHub user tzulitai opened a pull request:
https://github.com/apache/flink/pull/3804
[FLINK-6190] [core] Reconfigurable TypeSerializers
Reconfigurable TypeSerializers is the first step towards allowing
serializer upgrades. The second step, activating serializer upgrades for
registered state, will be a separate follow-up PR based on this one.
This PR includes both
[FLINK-6190](https://issues.apache.org/jira/browse/FLINK-6190) (configuration
snapshot for all serializers) and
[FLINK-6191](https://issues.apache.org/jira/browse/FLINK-6191) (reconfiguration
logic for all serializers). They have been bundled together in this PR because
they share common concerns.
Since the change touches all serializers currently in Flink, to ease
review, the PR is broken up into incrementally buildable commits that are
independent for different kinds of serializers.
## Description
Reconfigurable serializers consist of 2 parts:
1. Creating a snapshot of a serializer's configuration, that can be written
to state with definable versioned format. A serializer's configuration snapshot
is a point-in-time view of the current state and parameters of the serializer
at the time of the snapshot. For example, for the `KryoSerializer`, its
configuration should contain the registration order of its classes and
serializers. Other serializers like the `PojoSerializer` contain configuration
parameters that change during runtime, e.g. the cached serializers for its
non-registered POJO subclasses (the explanation of why certain information
needs to be persisted for each serializer's configuration snapshot is included
as code comments and Javadocs in the PR).
2. Reconfiguring a new serializer with the configuration snapshot of a
preceding serializer, so that it may be compatible to read old data written by
the preceding serializer.
## New user interfaces
New methods in the `TypeSerializer` interface:
- `TypeSerializer#snapshotConfiguration()`: extracts a configuration
snapshot from the serializer
- `TypeSerializer#reconfigure(TypeSerializerConfigSnapshot)`: reconfigure
the serializer with its preceding serializer's configuration snapshot.
New classes:
- `TypeSerializerConfigSnapshot`: a `VersionedIOReadableWritable` base
class for serializers to implement their own configuration snapshot class. Each
serializer needs to encode its own information about its serialization format
and its required parameters within its own config snapshot class.
- `ForwardCompatibleSerializationFormatConfig`: a special marker config
that implementations of `TypeSerializer#snapshotConfiguration()` can return to
signal that new serializers for the data written by the serializer does not
need to be reconfigured for compatibility. For example, user custom serializers
using serialization frameworks with built-in compatibility mechanisms (e.g.
Thrift, Protobut etc.) can simply just return this marker.
- `ReconfigureResult`: enum representing the result of a reconfiguration,
returned from `TypeSerializer#reconfigure(TypeSerializerConfigSnapshot)`. 3
kinds of results are currently defined - please see the Javadoc of the class
for details.
## Tests
Specific tests have been added for the more complex serializers, namely:
- `EnumSerializer` (841298a)
- `KryoSerializer` (5b50505)
- `PojoSerializer` (7ca5496)
General `TypeSerializerConfigSnapshot` tests such as de-/serialization of
configuration snapshots is added in c2bb1ef.
Also, two fundamental unit tests related to config snapshots have also been
added to `SerializerTestBase` (b917941).
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tzulitai/flink FLINK-6190
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/3804.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #3804
----
----
> Write "Serializer Configurations" metainfo along with state
> -----------------------------------------------------------
>
> Key: FLINK-6190
> URL: https://issues.apache.org/jira/browse/FLINK-6190
> Project: Flink
> Issue Type: Sub-task
> Components: State Backends, Checkpointing, Type Serialization System
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Tzu-Li (Gordon) Tai
>
> In order for serializers to be able to be reconfigured on restore, we need
> knowledge of the previous serializer configurations, e.g. what types were
> registered, with which specific / default serializers, and especially for
> Kryo, the order they were registered.
> For this, we will need serializer configuration metainfo to be self-contained
> within the written state.
> For the implementation, we propose to include the following changes:
> - Have a new separate {{SerializersConfig}} class that is extracted from
> {{ExecutionConfig}}. This new class should contain only the
> serializer-related configurations (e.g., {{registeredKryoTypes}},
> {{registeredPojoTypes}}, etc.). The {{SerializersConfig}} class should only
> be internally used, and therefore annotated with {{Internal}}. Users should
> still use the {{ExecutionConfig}} to configure serializers.
> - For serializers that previously require a {{ExecutionConfig}} in
> constructors, try changing them to take a {{SerializersConfig}} instead.
> - Introduce {{SerializersConfigSerializationProxy}}, which is in charge of
> serializing the current {{SerializersConfig}} when writing state to streams.
> This proxy defines the the serialized format of serializer configurations,
> therefore should be a {{VersionedIOReadableWritable}} as we may change the
> format / information written in the future.
> - Add {{SerializersConfigSerializationProxy}} into state backends
> serialization proxies (e.g. {{KeyedBackendSerializationProxy}}) so that the
> serializer configuration is written into state. Need to additionally make
> sure backwards compatibility of previous-version backend serialization
> proxies.
> For the initial version, we propose to include the following within the
> written serialization config metadata (ordered):
> 1. {{registeredPojoTypes}}
> 2. {{Throwable.class}} -->
> {{org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer}} default
> Flink-specific registry for serializing throwables.
> 3. {{defaultKryoSerializers}}
> 4. {{defaultKryoSerializerClasses}}
> 5. Kryo registrations for all primitive types (and boxed versions). This is
> to allow compatibility in case the built-in registrations for the primitive
> types change in Kryo in the future.
> 6. {{registeredTypesWithKryoSerializers}}
> 7. {{registeredTypesWithKryoSerializerClasses}}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)