[ 
https://issues.apache.org/jira/browse/FLINK-6478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028183#comment-16028183
 ] 

ASF GitHub Bot commented on FLINK-6478:
---------------------------------------

Github user alpinegizmo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4006#discussion_r118903664
  
    --- Diff: docs/dev/stream/state.md ---
    @@ -429,3 +429,120 @@ public static class CounterSource
     
     Some operators might need the information when a checkpoint is fully 
acknowledged by Flink to communicate that with the outside world. In this case 
see the `org.apache.flink.runtime.state.CheckpointListener` interface.
     
    +## Custom serialization for managed state
    +
    +This section is targeted as a guideline for users who require using custom 
serialization for their state, covering how
    +to provide a custom serializer and how to handle upgrades to the 
serializer for compatibility. If you're simply using
    +Flink's own serializers, this section is irrelevant and can be skipped. 
    +
    +### Using custom serializers
    +
    +As demonstrated in the above examples, when registering a managed operator 
or keyed state, a `StateDescriptor` is required
    +to specify the state's name, as well as information about the type of the 
state. The type information is used by Flink's
    +[type serialization framework](../types_serialization.html) to create 
appropriate serializers for the state.
    +
    +It is also possible to completely bypass this and let Flink use your own 
custom serializer to serialize managed states,
    +simply by directly instantiating the `StateDescriptor` with your own 
`TypeSerializer` implementation:
    +
    +{% highlight java %}
    +ListStateDescriptor<Tuple2<String, Integer>> descriptor =
    +    new ListStateDescriptor<>(
    +        "state-name",
    +        new TypeSerializer<> {...});
    +
    +checkpointedState = getRuntimeContext().getListState(descriptor);
    +{% endhighlight %}
    +
    +### Handling serializer upgrades and compatibility
    +
    +Flink allows changing the serializers used to read and write managed 
state, so that users are not locked in to any
    +specific serialization. When state is restored, the new serializer 
registered for the state (i.e., the serializer
    +that comes with the `StateDescriptor` used to access the state in the 
restored job) will be checked for compatibility,
    +and is replaced as the new serializer for the state.
    +
    +A compatible serializer would mean that the serializer is capable of 
reading previous serialized bytes of the state,
    +and the written binary format of the state also remains identical. The 
means to check the new serializer's compatibility
    +is provided through the following two methods of the `TypeSerializer` 
interface:
    +
    +{% highlight java %}
    +public abstract TypeSerializerConfigSnapshot snapshotConfiguration();
    +public abstract CompatibilityResult 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot);
    +{% endhighlight %}
    +
    +Briefly speaking, every time a checkpoint is performed, the 
`snapshotConfiguration` method is called to create a
    +point-in-time view of the state serializer's configuration. The returned 
configuration snapshot is stored along with the
    +checkpoint as the state's metadata. When the checkpoint is used to restore 
a job, that serializer configuration snapshot
    +will be used to confront the _new_ serializer of the same state via the 
counterpart method, `ensureCompatibility`. This
    +method serves as a check for whether or not the new serializer is 
compatible, as well as a hook to possibly reconfigure
    +the new serializer in the case that it is incompatible.
    +
    +Note that Flink's own serializers are implemented such that they are at 
least compatible with themselves, i.e. when the
    +same serializer is used for the state in the restored job, the 
serializer's will reconfigure themselves to be compatible
    +with their previous configuration.
    +
    +The following subsections illustrate guidelines to implement these two 
methods when using custom serializers.
    +
    +#### Implementing the `snapshotConfiguration` method
    +
    +The serializer's configuration snapshot should capture enough information 
such that on restore, the information
    +carried over to the new serializer for the state is sufficient for it to 
determine whether or not it is compatible.
    +This could typically contain information about the serializer's parameters 
or binary format of the serialized data;
    +generally, anything that allows the new serializer to decide whether or 
not it can be used to read previous serialized
    +bytes, and that it writes in the same binary format.
    +
    +How the serializer's configuration snapshot is written to and read from 
checkpoints is fully customizable. The below
    +is the base class for all serializer configuration snapshot 
implementations, the `TypeSerializerConfigSnapshot`.
    +
    +{% highlight java %}
    +public abstract TypeSerializerConfigSnapshot extends 
VersionedIOReadableWritable {
    +  public abstract int getVersion();
    +  public void read(DataInputView in) {...}
    +  public void write(DataOutputView out) {...}
    +}
    +{% endhighlight %}
    +
    +The `read` and `write` methods define how the configuration is read from 
and written to the checkpoint. The base
    +implementations contain logic to read and write the version of the 
configuration snapshot, so it should be extended and
    +not completely overridden.
    +
    +The version of the configuration snapshot is determined through the 
`getVersion` method. Versioning for the serializer
    +configuration snapshot is the means to maintain compatible configurations, 
as information included in the configuration
    +may change over time. When reading from the checkpoint, you can use the 
`getReadVersion` method to determine the version
    +of the written configuration and adapt the read logic to the specific 
version.
    +
    +<span class="label label-danger">Attention</span> Do not mistaken the 
version of the serializer's configuration snapshot
    +to be related with upgrading the serializer. The exact same serializer can 
have different implementations of its
    --- End diff --
    
    I think you meant to say "mistake" rather than "mistaken", but how about a 
simpler construction:
    
      The version of the serializer's configuration snapshot is **not** related 
to upgrading the serializer.


> Add documentation on how to upgrade serializers for managed state
> -----------------------------------------------------------------
>
>                 Key: FLINK-6478
>                 URL: https://issues.apache.org/jira/browse/FLINK-6478
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Documentation
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Critical
>             Fix For: 1.3.0
>
>
> There needs to be a documentation that explains how to use the new serializer 
> upgrade APIs in {{TypeSerializer}}, and how the methods work with 
> checkpoints. This documentation should probably be placed under "Application 
> development --> Streaming --> Working with State".
> Ideally, it should also come with a minimal example for users that perhaps 
> use serialization frameworks that already have built-in backwards 
> compatibility (such as Thrift).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to