[
https://issues.apache.org/jira/browse/FLINK-6478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028183#comment-16028183
]
ASF GitHub Bot commented on FLINK-6478:
---------------------------------------
Github user alpinegizmo commented on a diff in the pull request:
https://github.com/apache/flink/pull/4006#discussion_r118903664
--- Diff: docs/dev/stream/state.md ---
@@ -429,3 +429,120 @@ public static class CounterSource
Some operators might need the information when a checkpoint is fully
acknowledged by Flink to communicate that with the outside world. In this case
see the `org.apache.flink.runtime.state.CheckpointListener` interface.
+## Custom serialization for managed state
+
+This section is targeted as a guideline for users who require using custom
serialization for their state, covering how
+to provide a custom serializer and how to handle upgrades to the
serializer for compatibility. If you're simply using
+Flink's own serializers, this section is irrelevant and can be skipped.
+
+### Using custom serializers
+
+As demonstrated in the above examples, when registering a managed operator
or keyed state, a `StateDescriptor` is required
+to specify the state's name, as well as information about the type of the
state. The type information is used by Flink's
+[type serialization framework](../types_serialization.html) to create
appropriate serializers for the state.
+
+It is also possible to completely bypass this and let Flink use your own
custom serializer to serialize managed states,
+simply by directly instantiating the `StateDescriptor` with your own
`TypeSerializer` implementation:
+
+{% highlight java %}
+ListStateDescriptor<Tuple2<String, Integer>> descriptor =
+ new ListStateDescriptor<>(
+ "state-name",
+ new TypeSerializer<> {...});
+
+checkpointedState = getRuntimeContext().getListState(descriptor);
+{% endhighlight %}
+
+### Handling serializer upgrades and compatibility
+
+Flink allows changing the serializers used to read and write managed
state, so that users are not locked in to any
+specific serialization. When state is restored, the new serializer
registered for the state (i.e., the serializer
+that comes with the `StateDescriptor` used to access the state in the
restored job) will be checked for compatibility,
+and is replaced as the new serializer for the state.
+
+A compatible serializer would mean that the serializer is capable of
reading previous serialized bytes of the state,
+and the written binary format of the state also remains identical. The
means to check the new serializer's compatibility
+is provided through the following two methods of the `TypeSerializer`
interface:
+
+{% highlight java %}
+public abstract TypeSerializerConfigSnapshot snapshotConfiguration();
+public abstract CompatibilityResult
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot);
+{% endhighlight %}
+
+Briefly speaking, every time a checkpoint is performed, the
`snapshotConfiguration` method is called to create a
+point-in-time view of the state serializer's configuration. The returned
configuration snapshot is stored along with the
+checkpoint as the state's metadata. When the checkpoint is used to restore
a job, that serializer configuration snapshot
+will be used to confront the _new_ serializer of the same state via the
counterpart method, `ensureCompatibility`. This
+method serves as a check for whether or not the new serializer is
compatible, as well as a hook to possibly reconfigure
+the new serializer in the case that it is incompatible.
+
+Note that Flink's own serializers are implemented such that they are at
least compatible with themselves, i.e. when the
+same serializer is used for the state in the restored job, the
serializer's will reconfigure themselves to be compatible
+with their previous configuration.
+
+The following subsections illustrate guidelines to implement these two
methods when using custom serializers.
+
+#### Implementing the `snapshotConfiguration` method
+
+The serializer's configuration snapshot should capture enough information
such that on restore, the information
+carried over to the new serializer for the state is sufficient for it to
determine whether or not it is compatible.
+This could typically contain information about the serializer's parameters
or binary format of the serialized data;
+generally, anything that allows the new serializer to decide whether or
not it can be used to read previous serialized
+bytes, and that it writes in the same binary format.
+
+How the serializer's configuration snapshot is written to and read from
checkpoints is fully customizable. The below
+is the base class for all serializer configuration snapshot
implementations, the `TypeSerializerConfigSnapshot`.
+
+{% highlight java %}
+public abstract TypeSerializerConfigSnapshot extends
VersionedIOReadableWritable {
+ public abstract int getVersion();
+ public void read(DataInputView in) {...}
+ public void write(DataOutputView out) {...}
+}
+{% endhighlight %}
+
+The `read` and `write` methods define how the configuration is read from
and written to the checkpoint. The base
+implementations contain logic to read and write the version of the
configuration snapshot, so it should be extended and
+not completely overridden.
+
+The version of the configuration snapshot is determined through the
`getVersion` method. Versioning for the serializer
+configuration snapshot is the means to maintain compatible configurations,
as information included in the configuration
+may change over time. When reading from the checkpoint, you can use the
`getReadVersion` method to determine the version
+of the written configuration and adapt the read logic to the specific
version.
+
+<span class="label label-danger">Attention</span> Do not mistaken the
version of the serializer's configuration snapshot
+to be related with upgrading the serializer. The exact same serializer can
have different implementations of its
--- End diff --
I think you meant to say "mistake" rather than "mistaken", but how about a
simpler construction:
The version of the serializer's configuration snapshot is **not** related
to upgrading the serializer.
> Add documentation on how to upgrade serializers for managed state
> -----------------------------------------------------------------
>
> Key: FLINK-6478
> URL: https://issues.apache.org/jira/browse/FLINK-6478
> Project: Flink
> Issue Type: Sub-task
> Components: Documentation
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Tzu-Li (Gordon) Tai
> Priority: Critical
> Fix For: 1.3.0
>
>
> There needs to be a documentation that explains how to use the new serializer
> upgrade APIs in {{TypeSerializer}}, and how the methods work with
> checkpoints. This documentation should probably be placed under "Application
> development --> Streaming --> Working with State".
> Ideally, it should also come with a minimal example for users that perhaps
> use serialization frameworks that already have built-in backwards
> compatibility (such as Thrift).
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)