Github user alpinegizmo commented on a diff in the pull request:
https://github.com/apache/flink/pull/4006#discussion_r118904745
--- Diff: docs/dev/stream/state.md ---
@@ -429,3 +429,120 @@ public static class CounterSource
Some operators might need the information when a checkpoint is fully
acknowledged by Flink to communicate that with the outside world. In this case
see the `org.apache.flink.runtime.state.CheckpointListener` interface.
+## Custom serialization for managed state
+
+This section is targeted as a guideline for users who require using custom
serialization for their state, covering how
+to provide a custom serializer and how to handle upgrades to the
serializer for compatibility. If you're simply using
+Flink's own serializers, this section is irrelevant and can be skipped.
+
+### Using custom serializers
+
+As demonstrated in the above examples, when registering a managed operator
or keyed state, a `StateDescriptor` is required
+to specify the state's name, as well as information about the type of the
state. The type information is used by Flink's
+[type serialization framework](../types_serialization.html) to create
appropriate serializers for the state.
+
+It is also possible to completely bypass this and let Flink use your own
custom serializer to serialize managed states,
+simply by directly instantiating the `StateDescriptor` with your own
`TypeSerializer` implementation:
+
+{% highlight java %}
+ListStateDescriptor<Tuple2<String, Integer>> descriptor =
+ new ListStateDescriptor<>(
+ "state-name",
+ new TypeSerializer<> {...});
+
+checkpointedState = getRuntimeContext().getListState(descriptor);
+{% endhighlight %}
+
+### Handling serializer upgrades and compatibility
+
+Flink allows changing the serializers used to read and write managed
state, so that users are not locked in to any
+specific serialization. When state is restored, the new serializer
registered for the state (i.e., the serializer
+that comes with the `StateDescriptor` used to access the state in the
restored job) will be checked for compatibility,
+and is replaced as the new serializer for the state.
+
+A compatible serializer would mean that the serializer is capable of
reading previous serialized bytes of the state,
+and the written binary format of the state also remains identical. The
means to check the new serializer's compatibility
+is provided through the following two methods of the `TypeSerializer`
interface:
+
+{% highlight java %}
+public abstract TypeSerializerConfigSnapshot snapshotConfiguration();
+public abstract CompatibilityResult
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot);
+{% endhighlight %}
+
+Briefly speaking, every time a checkpoint is performed, the
`snapshotConfiguration` method is called to create a
+point-in-time view of the state serializer's configuration. The returned
configuration snapshot is stored along with the
+checkpoint as the state's metadata. When the checkpoint is used to restore
a job, that serializer configuration snapshot
+will be used to confront the _new_ serializer of the same state via the
counterpart method, `ensureCompatibility`. This
--- End diff --
I don't understand the use of "confront" in this context. Perhaps you mean
something like "determine the compatibility of" or "verify the compatibility
of" ?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---