[ 
https://issues.apache.org/jira/browse/FLINK-6478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028194#comment-16028194
 ] 

ASF GitHub Bot commented on FLINK-6478:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4006#discussion_r118906667
  
    --- Diff: docs/dev/stream/state.md ---
    @@ -429,3 +429,120 @@ public static class CounterSource
     
     Some operators might need the information when a checkpoint is fully 
acknowledged by Flink to communicate that with the outside world. In this case 
see the `org.apache.flink.runtime.state.CheckpointListener` interface.
     
    +## Custom serialization for managed state
    +
    +This section is targeted as a guideline for users who require using custom 
serialization for their state, covering how
    +to provide a custom serializer and how to handle upgrades to the 
serializer for compatibility. If you're simply using
    +Flink's own serializers, this section is irrelevant and can be skipped. 
    +
    +### Using custom serializers
    +
    +As demonstrated in the above examples, when registering a managed operator 
or keyed state, a `StateDescriptor` is required
    +to specify the state's name, as well as information about the type of the 
state. The type information is used by Flink's
    +[type serialization framework](../types_serialization.html) to create 
appropriate serializers for the state.
    +
    +It is also possible to completely bypass this and let Flink use your own 
custom serializer to serialize managed states,
    +simply by directly instantiating the `StateDescriptor` with your own 
`TypeSerializer` implementation:
    +
    +{% highlight java %}
    +ListStateDescriptor<Tuple2<String, Integer>> descriptor =
    +    new ListStateDescriptor<>(
    +        "state-name",
    +        new TypeSerializer<> {...});
    +
    +checkpointedState = getRuntimeContext().getListState(descriptor);
    +{% endhighlight %}
    +
    +### Handling serializer upgrades and compatibility
    +
    +Flink allows changing the serializers used to read and write managed 
state, so that users are not locked in to any
    +specific serialization. When state is restored, the new serializer 
registered for the state (i.e., the serializer
    +that comes with the `StateDescriptor` used to access the state in the 
restored job) will be checked for compatibility,
    +and is replaced as the new serializer for the state.
    +
    +A compatible serializer would mean that the serializer is capable of 
reading previous serialized bytes of the state,
    +and the written binary format of the state also remains identical. The 
means to check the new serializer's compatibility
    +is provided through the following two methods of the `TypeSerializer` 
interface:
    +
    +{% highlight java %}
    +public abstract TypeSerializerConfigSnapshot snapshotConfiguration();
    +public abstract CompatibilityResult 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot);
    +{% endhighlight %}
    +
    +Briefly speaking, every time a checkpoint is performed, the 
`snapshotConfiguration` method is called to create a
    +point-in-time view of the state serializer's configuration. The returned 
configuration snapshot is stored along with the
    +checkpoint as the state's metadata. When the checkpoint is used to restore 
a job, that serializer configuration snapshot
    +will be used to confront the _new_ serializer of the same state via the 
counterpart method, `ensureCompatibility`. This
    --- End diff --
    
    It was just to explain that the checkpointed serializer config snapshot 
will be provided to the new serializer to verify its compatibility. I'll use 
"provide" instead.


> Add documentation on how to upgrade serializers for managed state
> -----------------------------------------------------------------
>
>                 Key: FLINK-6478
>                 URL: https://issues.apache.org/jira/browse/FLINK-6478
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Documentation
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Critical
>             Fix For: 1.3.0
>
>
> There needs to be a documentation that explains how to use the new serializer 
> upgrade APIs in {{TypeSerializer}}, and how the methods work with 
> checkpoints. This documentation should probably be placed under "Application 
> development --> Streaming --> Working with State".
> Ideally, it should also come with a minimal example for users that perhaps 
> use serialization frameworks that already have built-in backwards 
> compatibility (such as Thrift).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to