Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4006#discussion_r118906667
  
    --- Diff: docs/dev/stream/state.md ---
    @@ -429,3 +429,120 @@ public static class CounterSource
     
     Some operators might need the information when a checkpoint is fully 
acknowledged by Flink to communicate that with the outside world. In this case 
see the `org.apache.flink.runtime.state.CheckpointListener` interface.
     
    +## Custom serialization for managed state
    +
    +This section is targeted as a guideline for users who require using custom 
serialization for their state, covering how
    +to provide a custom serializer and how to handle upgrades to the 
serializer for compatibility. If you're simply using
    +Flink's own serializers, this section is irrelevant and can be skipped. 
    +
    +### Using custom serializers
    +
    +As demonstrated in the above examples, when registering a managed operator 
or keyed state, a `StateDescriptor` is required
    +to specify the state's name, as well as information about the type of the 
state. The type information is used by Flink's
    +[type serialization framework](../types_serialization.html) to create 
appropriate serializers for the state.
    +
    +It is also possible to completely bypass this and let Flink use your own 
custom serializer to serialize managed states,
    +simply by directly instantiating the `StateDescriptor` with your own 
`TypeSerializer` implementation:
    +
    +{% highlight java %}
    +ListStateDescriptor<Tuple2<String, Integer>> descriptor =
    +    new ListStateDescriptor<>(
    +        "state-name",
    +        new TypeSerializer<> {...});
    +
    +checkpointedState = getRuntimeContext().getListState(descriptor);
    +{% endhighlight %}
    +
    +### Handling serializer upgrades and compatibility
    +
    +Flink allows changing the serializers used to read and write managed 
state, so that users are not locked in to any
    +specific serialization. When state is restored, the new serializer 
registered for the state (i.e., the serializer
    +that comes with the `StateDescriptor` used to access the state in the 
restored job) will be checked for compatibility,
    +and is replaced as the new serializer for the state.
    +
    +A compatible serializer would mean that the serializer is capable of 
reading previous serialized bytes of the state,
    +and the written binary format of the state also remains identical. The 
means to check the new serializer's compatibility
    +is provided through the following two methods of the `TypeSerializer` 
interface:
    +
    +{% highlight java %}
    +public abstract TypeSerializerConfigSnapshot snapshotConfiguration();
    +public abstract CompatibilityResult 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot);
    +{% endhighlight %}
    +
    +Briefly speaking, every time a checkpoint is performed, the 
`snapshotConfiguration` method is called to create a
    +point-in-time view of the state serializer's configuration. The returned 
configuration snapshot is stored along with the
    +checkpoint as the state's metadata. When the checkpoint is used to restore 
a job, that serializer configuration snapshot
    +will be used to confront the _new_ serializer of the same state via the 
counterpart method, `ensureCompatibility`. This
    --- End diff --
    
    It was just to explain that the checkpointed serializer config snapshot 
will be provided to the new serializer to verify its compatibility. I'll use 
"provide" instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to