Github user alpinegizmo commented on a diff in the pull request:
https://github.com/apache/flink/pull/4006#discussion_r118903664
--- Diff: docs/dev/stream/state.md ---
@@ -429,3 +429,120 @@ public static class CounterSource
Some operators might need the information when a checkpoint is fully
acknowledged by Flink to communicate that with the outside world. In this case
see the `org.apache.flink.runtime.state.CheckpointListener` interface.
+## Custom serialization for managed state
+
+This section is targeted as a guideline for users who require using custom
serialization for their state, covering how
+to provide a custom serializer and how to handle upgrades to the
serializer for compatibility. If you're simply using
+Flink's own serializers, this section is irrelevant and can be skipped.
+
+### Using custom serializers
+
+As demonstrated in the above examples, when registering a managed operator
or keyed state, a `StateDescriptor` is required
+to specify the state's name, as well as information about the type of the
state. The type information is used by Flink's
+[type serialization framework](../types_serialization.html) to create
appropriate serializers for the state.
+
+It is also possible to completely bypass this and let Flink use your own
custom serializer to serialize managed states,
+simply by directly instantiating the `StateDescriptor` with your own
`TypeSerializer` implementation:
+
+{% highlight java %}
+ListStateDescriptor<Tuple2<String, Integer>> descriptor =
+ new ListStateDescriptor<>(
+ "state-name",
+ new TypeSerializer<> {...});
+
+checkpointedState = getRuntimeContext().getListState(descriptor);
+{% endhighlight %}
+
+### Handling serializer upgrades and compatibility
+
+Flink allows changing the serializers used to read and write managed
state, so that users are not locked in to any
+specific serialization. When state is restored, the new serializer
registered for the state (i.e., the serializer
+that comes with the `StateDescriptor` used to access the state in the
restored job) will be checked for compatibility,
+and is replaced as the new serializer for the state.
+
+A compatible serializer would mean that the serializer is capable of
reading previous serialized bytes of the state,
+and the written binary format of the state also remains identical. The
means to check the new serializer's compatibility
+is provided through the following two methods of the `TypeSerializer`
interface:
+
+{% highlight java %}
+public abstract TypeSerializerConfigSnapshot snapshotConfiguration();
+public abstract CompatibilityResult
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot);
+{% endhighlight %}
+
+Briefly speaking, every time a checkpoint is performed, the
`snapshotConfiguration` method is called to create a
+point-in-time view of the state serializer's configuration. The returned
configuration snapshot is stored along with the
+checkpoint as the state's metadata. When the checkpoint is used to restore
a job, that serializer configuration snapshot
+will be used to confront the _new_ serializer of the same state via the
counterpart method, `ensureCompatibility`. This
+method serves as a check for whether or not the new serializer is
compatible, as well as a hook to possibly reconfigure
+the new serializer in the case that it is incompatible.
+
+Note that Flink's own serializers are implemented such that they are at
least compatible with themselves, i.e. when the
+same serializer is used for the state in the restored job, the
serializer's will reconfigure themselves to be compatible
+with their previous configuration.
+
+The following subsections illustrate guidelines to implement these two
methods when using custom serializers.
+
+#### Implementing the `snapshotConfiguration` method
+
+The serializer's configuration snapshot should capture enough information
such that on restore, the information
+carried over to the new serializer for the state is sufficient for it to
determine whether or not it is compatible.
+This could typically contain information about the serializer's parameters
or binary format of the serialized data;
+generally, anything that allows the new serializer to decide whether or
not it can be used to read previous serialized
+bytes, and that it writes in the same binary format.
+
+How the serializer's configuration snapshot is written to and read from
checkpoints is fully customizable. The below
+is the base class for all serializer configuration snapshot
implementations, the `TypeSerializerConfigSnapshot`.
+
+{% highlight java %}
+public abstract TypeSerializerConfigSnapshot extends
VersionedIOReadableWritable {
+ public abstract int getVersion();
+ public void read(DataInputView in) {...}
+ public void write(DataOutputView out) {...}
+}
+{% endhighlight %}
+
+The `read` and `write` methods define how the configuration is read from
and written to the checkpoint. The base
+implementations contain logic to read and write the version of the
configuration snapshot, so it should be extended and
+not completely overridden.
+
+The version of the configuration snapshot is determined through the
`getVersion` method. Versioning for the serializer
+configuration snapshot is the means to maintain compatible configurations,
as information included in the configuration
+may change over time. When reading from the checkpoint, you can use the
`getReadVersion` method to determine the version
+of the written configuration and adapt the read logic to the specific
version.
+
+<span class="label label-danger">Attention</span> Do not mistaken the
version of the serializer's configuration snapshot
+to be related with upgrading the serializer. The exact same serializer can
have different implementations of its
--- End diff --
I think you meant to say "mistake" rather than "mistaken", but how about a
simpler construction:
The version of the serializer's configuration snapshot is **not** related
to upgrading the serializer.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---