[ 
https://issues.apache.org/jira/browse/FLINK-9574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16693189#comment-16693189
 ] 

ASF GitHub Bot commented on FLINK-9574:
---------------------------------------

igalshilman commented on a change in pull request #7124: [FLINK-9574] [doc]  
Rework documentation for custom state serializers and state evolution
URL: https://github.com/apache/flink/pull/7124#discussion_r234981861
 
 

 ##########
 File path: docs/dev/stream/state/custom_serialization.md
 ##########
 @@ -66,125 +69,166 @@ checkpointedState = 
getRuntimeContext.getListState(descriptor)
 </div>
 </div>
 
-Note that Flink writes state serializers along with the state as metadata. In 
certain cases on restore (see following
-subsections), the written serializer needs to be deserialized and used. 
Therefore, it is recommended to avoid using
-anonymous classes as your state serializers. Anonymous classes do not have a 
guarantee on the generated classname,
-which varies across compilers and depends on the order that they are 
instantiated within the enclosing class, which can 
-easily cause the previously written serializer to be unreadable (since the 
original class can no longer be found in the
-classpath).
-
-### Handling serializer upgrades and compatibility
-
-Flink allows changing the serializers used to read and write managed state, so 
that users are not locked in to any
-specific serialization. When state is restored, the new serializer registered 
for the state (i.e., the serializer
-that comes with the `StateDescriptor` used to access the state in the restored 
job) will be checked for compatibility,
-and is replaced as the new serializer for the state.
-
-A compatible serializer would mean that the serializer is capable of reading 
previous serialized bytes of the state,
-and the written binary format of the state also remains identical. The means 
to check the new serializer's compatibility
-is provided through the following two methods of the `TypeSerializer` 
interface:
-
-{% highlight java %}
-public abstract TypeSerializerConfigSnapshot snapshotConfiguration();
-public abstract CompatibilityResult 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot);
-{% endhighlight %}
-
-Briefly speaking, every time a checkpoint is performed, the 
`snapshotConfiguration` method is called to create a
-point-in-time view of the state serializer's configuration. The returned 
configuration snapshot is stored along with the
-checkpoint as the state's metadata. When the checkpoint is used to restore a 
job, that serializer configuration snapshot
-will be provided to the _new_ serializer of the same state via the counterpart 
method, `ensureCompatibility`, to verify
-compatibility of the new serializer. This method serves as a check for whether 
or not the new serializer is compatible,
-as well as a hook to possibly reconfigure the new serializer in the case that 
it is incompatible.
+## State serializers and schema evolution
 
-Note that Flink's own serializers are implemented such that they are at least 
compatible with themselves, i.e. when the
-same serializer is used for the state in the restored job, the serializer's 
will reconfigure themselves to be compatible
-with their previous configuration.
+This section explains the user-facing abstractions related to state 
serialization and schema evolution, and necessary
+internal details about how Flink interacts with these abstractions.
 
-The following subsections illustrate guidelines to implement these two methods 
when using custom serializers.
+When restoring from savepoints, Flink allows changing the serializers used to 
read and write prior registered state,
+so that users are not locked in to any specific serialization schema. When 
state is restored, a new serializer will be
+registered for the state (i.e., the serializer that comes with the 
`StateDescriptor` used to access the state in the
+restored job). This new serializer may have a different schema than that of 
the prior serializer. Therefore, when
+implementing state serializers, besides the basic logic of reading / writing 
data, another important thing to keep in
+mind is how the serialization schema can be changed in the future.
 
-#### Implementing the `snapshotConfiguration` method
+When speaking of *schema*, in this context the term is interchangeable between 
referring to the *data model* of a state
+type and the *serialized binary format* of a state type. The schema, generally 
speaking, can change for a few cases:
 
-The serializer's configuration snapshot should capture enough information such 
that on restore, the information
-carried over to the new serializer for the state is sufficient for it to 
determine whether or not it is compatible.
-This could typically contain information about the serializer's parameters or 
binary format of the serialized data;
-generally, anything that allows the new serializer to decide whether or not it 
can be used to read previous serialized
-bytes, and that it writes in the same binary format.
+ 1. Data schema of the state type has evolved, i.e. adding or removing a field 
from a POJO that is used as state.
+ 2. Following the above, generally speaking, the serialization format of the 
serializer is intended to be upgraded.
+ 3. Configuration of the serializer has changed, i.e. Kryo types are 
registered in a different order than the
+ previous execution, leading to mismatching registration ids for types.
+ 
+In order for the new execution to have information about the *written schema* 
of state and detect whether or not the
+schema has changed, upon taking a savepoint of an operator's state, a 
*snapshot* of the state serializer needs to be
+written along with the state bytes. This is abstracted a 
`TypeSerializerSnapshot`, explained in the next subsection.
 
-How the serializer's configuration snapshot is written to and read from 
checkpoints is fully customizable. The below
-is the base class for all serializer configuration snapshot implementations, 
the `TypeSerializerConfigSnapshot`.
+### The `TypeSerializerSnapshot` abstraction
 
+<div data-lang="java" markdown="1">
 {% highlight java %}
-public abstract TypeSerializerConfigSnapshot extends 
VersionedIOReadableWritable {
-  public abstract int getVersion();
-  public void read(DataInputView in) {...}
-  public void write(DataOutputView out) {...}
+public interface TypeSerializerSnapshot<T> {
+    int getCurrentVersion();
+    void writeSnapshot(DataOuputView out) throws IOException;
+    void readSnapshot(int readVersion, DataInputView in, ClassLoader 
userCodeClassLoader) throws IOException;
+    TypeSerializerSchemaCompatibility<T> 
resolveSchemaCompatibility(TypeSerializer<T> newSerializer);
+    TypeSerializer<T> restoreSerializer();
 }
 {% endhighlight %}
+</div>
 
-The `read` and `write` methods define how the configuration is read from and 
written to the checkpoint. The base
-implementations contain logic to read and write the version of the 
configuration snapshot, so it should be extended and
-not completely overridden.
-
-The version of the configuration snapshot is determined through the 
`getVersion` method. Versioning for the serializer
-configuration snapshot is the means to maintain compatible configurations, as 
information included in the configuration
-may change over time. By default, configuration snapshots are only compatible 
with the current version (as returned by
-`getVersion`). To indicate that the configuration is compatible with other 
versions, override the `getCompatibleVersions`
-method to return more version values. When reading from the checkpoint, you 
can use the `getReadVersion` method to
-determine the version of the written configuration and adapt the read logic to 
the specific version.
-
-<span class="label label-danger">Attention</span> The version of the 
serializer's configuration snapshot is **not**
-related to upgrading the serializer. The exact same serializer can have 
different implementations of its
-configuration snapshot, for example when more information is added to the 
configuration to allow more comprehensive
-compatibility checks in the future.
-
-One limitation of implementing a `TypeSerializerConfigSnapshot` is that an 
empty constructor must be present. The empty
-constructor is required when reading the configuration snapshot from 
checkpoints.
-
-#### Implementing the `ensureCompatibility` method
-
-The `ensureCompatibility` method should contain logic that performs checks 
against the information about the previous
-serializer carried over via the provided `TypeSerializerConfigSnapshot`, 
basically doing one of the following:
-
-  * Check whether the serializer is compatible, while possibly reconfiguring 
itself (if required) so that it may be
-    compatible. Afterwards, acknowledge with Flink that the serializer is 
compatible.
-
-  * Acknowledge that the serializer is incompatible and that state migration 
is required before Flink can proceed with
-    using the new serializer.
-
-The above cases can be translated to code by returning one of the following 
from the `ensureCompatibility` method:
-
-  * **`CompatibilityResult.compatible()`**: This acknowledges that the new 
serializer is compatible, or has been reconfigured to
-    be compatible, and Flink can proceed with the job with the serializer as 
is.
-
-  * **`CompatibilityResult.requiresMigration()`**: This acknowledges that the 
serializer is incompatible, or cannot be
-    reconfigured to be compatible, and requires a state migration before the 
new serializer can be used. State migration
-    is performed by using the previous serializer to read the restored state 
bytes to objects, and then serialized again
-    using the new serializer.
-
-  * **`CompatibilityResult.requiresMigration(TypeDeserializer 
deserializer)`**: This acknowledgement has equivalent semantics
-    to `CompatibilityResult.requiresMigration()`, but in the case that the 
previous serializer cannot be found or loaded
-    to read the restored state bytes for the migration, a provided 
`TypeDeserializer` can be used as a fallback resort.
-
-<span class="label label-danger">Attention</span> Currently, as of Flink 1.3, 
if the result of the compatibility check
-acknowledges that state migration needs to be performed, the job simply fails 
to restore from the checkpoint as state
-migration is currently not available. The ability to migrate state will be 
introduced in future releases.
-
-### Managing `TypeSerializer` and `TypeSerializerConfigSnapshot` classes in 
user code
-
-Since `TypeSerializer`s and `TypeSerializerConfigSnapshot`s are written as 
part of checkpoints along with the state
-values, the availability of the classes within the classpath may affect 
restore behaviour.
-
-`TypeSerializer`s are directly written into checkpoints using Java Object 
Serialization. In the case that the new
-serializer acknowledges that it is incompatible and requires state migration, 
it will be required to be present to be
-able to read the restored state bytes. Therefore, if the original serializer 
class no longer exists or has been modified
-(resulting in a different `serialVersionUID`) as a result of a serializer 
upgrade for the state, the restore would
-not be able to proceed. The alternative to this requirement is to provide a 
fallback `TypeDeserializer` when
-acknowledging that state migration is required, using 
`CompatibilityResult.requiresMigration(TypeDeserializer deserializer)`.
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public abstract class TypeSerializer<T> {    
+    
+    // ...
+    
+    public abstract TypeSerializerSnapshot<T> snapshotConfiguration();
+}
+{% endhighlight %}
+</div>
 
-The class of `TypeSerializerConfigSnapshot`s in the restored checkpoint must 
exist in the classpath, as they are
-fundamental components to compatibility checks on upgraded serializers and 
would not be able to be restored if the class
-is not present. Since configuration snapshots are written to checkpoints using 
custom serialization, the implementation
-of the class is free to be changed, as long as compatibility of the 
configuration change is handled using the versioning
-mechanisms in `TypeSerializerConfigSnapshot`.
+A serializer's `TypeSerializerSnapshot` is a piece of information that serves 
as the single source of truth about
+the state serializer's configuration and write schema. The logic about what 
should be written and read at restore time
+as the serializer snapshot is defined in the `writeSnapshot` and 
`readSnapshot` methods.
+
+Note that the snapshot's own write schema may also need to change over time 
(e.g. when you wish to add more information
+about the serializer to the snapshot). To facilitate this, snapshot's are 
versioned, with the current version
+number defined in the `getCurrentVersion` method. On restore when the 
serializer snapshot is read from savepoints,
+the version of the schema that the snapshot was written in will be provided to 
the `readSnapshot` method so that
+the read implementation can handle different versions.
+
+At restore time, the logic that detects whether or not the new serializer's 
schema has changed should be implemented in
+the `resolveSchemaCompatibility` method. When prior registered state is 
registered again with new serializers in the
+restored execution of an operator, the new serializer is provided to the prior 
serializer's snapshot via this method.
+This method returns a `TypeSerializerSchemaCompatibility` representing the 
result of the compatibility resolution,
+which can be one of the following:
+
+ 1. **`TypeSerializerSchemaCompatibility.compatibleAsIs()`**: this result 
signals that the new serializer is compatible,
+ meaning that the new serializer has identical schema with the prior 
serializer. It is possible that the new
+ serializer has been reconfigured in the `resolveSchemaCompatibility` method 
so that it is compatible.
+ 2. **`TypeSerializerSchemaCompatibility.compatibleAfterMigration()`**: this 
result signals that the new serializer has a
+ different serialization schema, and it is possible to migrate from the old 
schema by using the prior serializer
+ (which recognizes the old schema) to read bytes into state objects, and then 
rewriting the object back to bytes with
+ the new serializer (which recognizes the new schema). 
+ 3. **`TypeSerializerSchemaCompatibility.incompatible()`**: this result 
signals that the new serializer has a
+ different serialization schema, but it is not possible to migrate from the 
old schema.
+
+The last bit of detail is how the prior serializer is obtained in the case 
that migration is required.
+Another important role of a serializer's `TypeSerializerSnapshot` is that it 
serves as a factory to restore
+the prior serializer. More specifically, the `TypeSerializerSnapshot` should 
implement the `restoreSerializer` method
+to instantiate a serializer instance that recognizes the prior serializer's 
schema and configuration, and can therefore
+safely read data written by the prior serializer.
+
+### How Flink interacts with the `TypeSerializer` and `TypeSerializerSnapshot` 
abstractions
+
+To wrap up, this section concludes how Flink, or more specifically the state 
backends, interact with the
+abstractions. The interaction is slightly different depending on the state 
backend, but this is orthogonal
+to the implementation of state serializers and their serializer snapshots.
+
+#### Off-heap state backends (e.g. `RocksDBStateBackend`)
+
+ 1. **Register new state with a state serializer that has schema _A_**
+  - the registered `TypeSerializer` for the state is used to read / write 
state on every state access.
+  - State is written in schema *A*.
+ 2. **Take a savepoint**
+  - The serializer snapshot is extracted via the 
`TypeSerializer#snapshotConfiguration` method.
+  - The serializer snapshot is written to the savepoint, as well as the 
already-deserialized state bytes (with schema *A*).
 
 Review comment:
   did you mean: already-serialized?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a dedicated documentation page for state evolution
> ------------------------------------------------------
>
>                 Key: FLINK-9574
>                 URL: https://issues.apache.org/jira/browse/FLINK-9574
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Documentation, State Backends, Checkpointing, Type 
> Serialization System
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.8.0
>
>
> Currently, the only bit of documentation about serializer upgrades / state 
> evolution, is 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/custom_serialization.html#handling-serializer-upgrades-and-compatibility.],
>  which only explains things at an API level.
> State evolution over the time has proved to be a rather complex topic that is 
> often overlooked by users. Users would probably benefit from a actual 
> full-grown dedicated page that covers both API, some necessary internal 
> details regarding interplay of state serializers, best practices, and caution 
> notices.
> I propose to add this documentation as a subpage under Streaming/State & 
> Fault-Tolerance/.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to