carp84 commented on code in PR #15761:
URL: https://github.com/apache/flink/pull/15761#discussion_r966832586
##########
docs/content.zh/docs/dev/datastream/fault-tolerance/custom_serialization.md:
##########
@@ -69,30 +64,22 @@ checkpointedState =
getRuntimeContext.getListState(descriptor)
{{< /tab >}}
{{< /tabs >}}
-## State serializers and schema evolution
+## 状态序列化器及格式演进
-This section explains the user-facing abstractions related to state
serialization and schema evolution, and necessary
-internal details about how Flink interacts with these abstractions.
+本节介绍了 state 序列化以及结构升级相关的面向用户的抽象,以及 Flink 如何与这些抽象交互的一些内部细节。
Review Comment:
```suggestion
本节介绍了 state 序列化以及格式演进相关的面向用户的抽象,以及 Flink 如何与这些抽象交互的一些内部细节。
```
Suggest keeping a uniform translation for "schema evolution" within the page.
##########
docs/content.zh/docs/dev/datastream/fault-tolerance/custom_serialization.md:
##########
@@ -69,30 +64,22 @@ checkpointedState =
getRuntimeContext.getListState(descriptor)
{{< /tab >}}
{{< /tabs >}}
-## State serializers and schema evolution
+## 状态序列化器及格式演进
-This section explains the user-facing abstractions related to state
serialization and schema evolution, and necessary
-internal details about how Flink interacts with these abstractions.
+本节介绍了 state 序列化以及结构升级相关的面向用户的抽象,以及 Flink 如何与这些抽象交互的一些内部细节。
-When restoring from savepoints, Flink allows changing the serializers used to
read and write previously registered state,
-so that users are not locked in to any specific serialization schema. When
state is restored, a new serializer will be
-registered for the state (i.e., the serializer that comes with the
`StateDescriptor` used to access the state in the
-restored job). This new serializer may have a different schema than that of
the previous serializer. Therefore, when
-implementing state serializers, besides the basic logic of reading / writing
data, another important thing to keep in
-mind is how the serialization schema can be changed in the future.
+从 savepoint 恢复时,Flink 允许更改 state 的序列化器,从而支持结构升级。state 恢复之后,将使用新的序列化器进行 state
注册(即恢复后作业中 `StateDescriptor` 指定的序列化器),
+新的序列化器可能和之前的序列化器拥有不同的结构。因此,实现 state
序列化器的时候,处理正确处理读写数据的基本逻辑外,另外一个需要重点考虑的是未来如何支持 state 的机构升级。
-When speaking of *schema*, in this context the term is interchangeable between
referring to the *data model* of a state
-type and the *serialized binary format* of a state type. The schema, generally
speaking, can change for a few cases:
+这里说的 *结构*,既可能指 state 的 *数据模型*,也可能指 *序列化之后的二进制格式"。一般来说,结构在如下情况下会发生改变:
Review Comment:
```suggestion
这里说的 *格式*,既可能指 state 的 *数据模型*,也可能指 *序列化之后的二进制格式"。一般来说,格式在如下情况下会发生改变:
```
Ditto
##########
docs/content.zh/docs/dev/datastream/fault-tolerance/custom_serialization.md:
##########
@@ -69,30 +64,22 @@ checkpointedState =
getRuntimeContext.getListState(descriptor)
{{< /tab >}}
{{< /tabs >}}
-## State serializers and schema evolution
+## 状态序列化器及格式演进
-This section explains the user-facing abstractions related to state
serialization and schema evolution, and necessary
-internal details about how Flink interacts with these abstractions.
+本节介绍了 state 序列化以及结构升级相关的面向用户的抽象,以及 Flink 如何与这些抽象交互的一些内部细节。
-When restoring from savepoints, Flink allows changing the serializers used to
read and write previously registered state,
-so that users are not locked in to any specific serialization schema. When
state is restored, a new serializer will be
-registered for the state (i.e., the serializer that comes with the
`StateDescriptor` used to access the state in the
-restored job). This new serializer may have a different schema than that of
the previous serializer. Therefore, when
-implementing state serializers, besides the basic logic of reading / writing
data, another important thing to keep in
-mind is how the serialization schema can be changed in the future.
+从 savepoint 恢复时,Flink 允许更改 state 的序列化器,从而支持结构升级。state 恢复之后,将使用新的序列化器进行 state
注册(即恢复后作业中 `StateDescriptor` 指定的序列化器),
+新的序列化器可能和之前的序列化器拥有不同的结构。因此,实现 state
序列化器的时候,处理正确处理读写数据的基本逻辑外,另外一个需要重点考虑的是未来如何支持 state 的机构升级。
-When speaking of *schema*, in this context the term is interchangeable between
referring to the *data model* of a state
-type and the *serialized binary format* of a state type. The schema, generally
speaking, can change for a few cases:
+这里说的 *结构*,既可能指 state 的 *数据模型*,也可能指 *序列化之后的二进制格式"。一般来说,结构在如下情况下会发生改变:
- 1. Data schema of the state type has evolved, i.e. adding or removing a field
from a POJO that is used as state.
- 2. Generally speaking, after a change to the data schema, the serialization
format of the serializer will need to be upgraded.
- 3. Configuration of the serializer has changed.
-
-In order for the new execution to have information about the *written schema*
of state and detect whether or not the
-schema has changed, upon taking a savepoint of an operator's state, a
*snapshot* of the state serializer needs to be
-written along with the state bytes. This is abstracted a
`TypeSerializerSnapshot`, explained in the next subsection.
+ 1. state 的结构发生变化,比如 POJO 类中增加或删除字段。
Review Comment:
```suggestion
1. state 的格式发生变化,比如 POJO 类中增加或删除字段。
```
Ditto
##########
docs/content.zh/docs/dev/datastream/fault-tolerance/custom_serialization.md:
##########
@@ -112,109 +99,72 @@ public abstract class TypeSerializer<T> {
}
```
-A serializer's `TypeSerializerSnapshot` is a point-in-time information that
serves as the single source of truth about
-the state serializer's write schema, as well as any additional information
mandatory to restore a serializer that
-would be identical to the given point-in-time. The logic about what should be
written and read at restore time
-as the serializer snapshot is defined in the `writeSnapshot` and
`readSnapshot` methods.
-
-Note that the snapshot's own write schema may also need to change over time
(e.g. when you wish to add more information
-about the serializer to the snapshot). To facilitate this, snapshots are
versioned, with the current version
-number defined in the `getCurrentVersion` method. On restore, when the
serializer snapshot is read from savepoints,
-the version of the schema in which the snapshot was written in will be
provided to the `readSnapshot` method so that
-the read implementation can handle different versions.
-
-At restore time, the logic that detects whether or not the new serializer's
schema has changed should be implemented in
-the `resolveSchemaCompatibility` method. When previous registered state is
registered again with new serializers in the
-restored execution of an operator, the new serializer is provided to the
previous serializer's snapshot via this method.
-This method returns a `TypeSerializerSchemaCompatibility` representing the
result of the compatibility resolution,
-which can be one of the following:
-
- 1. **`TypeSerializerSchemaCompatibility.compatibleAsIs()`**: this result
signals that the new serializer is compatible,
- meaning that the new serializer has identical schema with the previous
serializer. It is possible that the new
- serializer has been reconfigured in the `resolveSchemaCompatibility` method
so that it is compatible.
- 2. **`TypeSerializerSchemaCompatibility.compatibleAfterMigration()`**: this
result signals that the new serializer has a
- different serialization schema, and it is possible to migrate from the old
schema by using the previous serializer
- (which recognizes the old schema) to read bytes into state objects, and then
rewriting the object back to bytes with
- the new serializer (which recognizes the new schema).
- 3. **`TypeSerializerSchemaCompatibility.incompatible()`**: this result
signals that the new serializer has a
- different serialization schema, but it is not possible to migrate from the
old schema.
-
-The last bit of detail is how the previous serializer is obtained in the case
that migration is required.
-Another important role of a serializer's `TypeSerializerSnapshot` is that it
serves as a factory to restore
-the previous serializer. More specifically, the `TypeSerializerSnapshot`
should implement the `restoreSerializer` method
-to instantiate a serializer instance that recognizes the previous serializer's
schema and configuration, and can therefore
-safely read data written by the previous serializer.
-
-### How Flink interacts with the `TypeSerializer` and `TypeSerializerSnapshot`
abstractions
-
-To wrap up, this section concludes how Flink, or more specifically the state
backends, interact with the
-abstractions. The interaction is slightly different depending on the state
backend, but this is orthogonal
-to the implementation of state serializers and their serializer snapshots.
+序列化器的 `TypeSerializerSnapshot` 包含序列化器的结构信息,以及恢复序列化器所需要的其他附加信息。序列化器的快照读写逻辑在
`writeSnapshot` 以及 `readSnapshot` 中进行实现。
+
+需要注意的是快照本身的格式可能也需要随时间发生变化(比如,往快照中增加更多序列化器的信息)。为了方便,快照携带版本号,可以通过
`getCurrentVersion` 获取当前版本。在恢复的时候,从 savepoint 读取到快照后,`readSnapshot`
会调用对应版本的实现方法。
+
+在恢复时,检测序列化器格式是否发生变化的逻辑应该在 `resolveSchemaCompatibility`
中实现,该方法接收新的序列化器作为参数。该方法返回一个 `TypeSerializerSchemaCompatibility`
表示兼容性的结果,该结果有如下几种:
Review Comment:
```suggestion
在恢复时,检测序列化器格式是否发生变化的逻辑应该在 `resolveSchemaCompatibility`
中实现。在算子的恢复(restore)过程中再次向新序列化器注册以前的已注册状态时,该方法接收新的序列化器作为参数,并返回一个
`TypeSerializerSchemaCompatibility` 表示兼容性检查的结果,可能的结果包括如下几种:
```
##########
docs/content.zh/docs/dev/datastream/fault-tolerance/custom_serialization.md:
##########
@@ -112,109 +99,72 @@ public abstract class TypeSerializer<T> {
}
```
-A serializer's `TypeSerializerSnapshot` is a point-in-time information that
serves as the single source of truth about
-the state serializer's write schema, as well as any additional information
mandatory to restore a serializer that
-would be identical to the given point-in-time. The logic about what should be
written and read at restore time
-as the serializer snapshot is defined in the `writeSnapshot` and
`readSnapshot` methods.
-
-Note that the snapshot's own write schema may also need to change over time
(e.g. when you wish to add more information
-about the serializer to the snapshot). To facilitate this, snapshots are
versioned, with the current version
-number defined in the `getCurrentVersion` method. On restore, when the
serializer snapshot is read from savepoints,
-the version of the schema in which the snapshot was written in will be
provided to the `readSnapshot` method so that
-the read implementation can handle different versions.
-
-At restore time, the logic that detects whether or not the new serializer's
schema has changed should be implemented in
-the `resolveSchemaCompatibility` method. When previous registered state is
registered again with new serializers in the
-restored execution of an operator, the new serializer is provided to the
previous serializer's snapshot via this method.
-This method returns a `TypeSerializerSchemaCompatibility` representing the
result of the compatibility resolution,
-which can be one of the following:
-
- 1. **`TypeSerializerSchemaCompatibility.compatibleAsIs()`**: this result
signals that the new serializer is compatible,
- meaning that the new serializer has identical schema with the previous
serializer. It is possible that the new
- serializer has been reconfigured in the `resolveSchemaCompatibility` method
so that it is compatible.
- 2. **`TypeSerializerSchemaCompatibility.compatibleAfterMigration()`**: this
result signals that the new serializer has a
- different serialization schema, and it is possible to migrate from the old
schema by using the previous serializer
- (which recognizes the old schema) to read bytes into state objects, and then
rewriting the object back to bytes with
- the new serializer (which recognizes the new schema).
- 3. **`TypeSerializerSchemaCompatibility.incompatible()`**: this result
signals that the new serializer has a
- different serialization schema, but it is not possible to migrate from the
old schema.
-
-The last bit of detail is how the previous serializer is obtained in the case
that migration is required.
-Another important role of a serializer's `TypeSerializerSnapshot` is that it
serves as a factory to restore
-the previous serializer. More specifically, the `TypeSerializerSnapshot`
should implement the `restoreSerializer` method
-to instantiate a serializer instance that recognizes the previous serializer's
schema and configuration, and can therefore
-safely read data written by the previous serializer.
-
-### How Flink interacts with the `TypeSerializer` and `TypeSerializerSnapshot`
abstractions
-
-To wrap up, this section concludes how Flink, or more specifically the state
backends, interact with the
-abstractions. The interaction is slightly different depending on the state
backend, but this is orthogonal
-to the implementation of state serializers and their serializer snapshots.
+序列化器的 `TypeSerializerSnapshot` 包含序列化器的结构信息,以及恢复序列化器所需要的其他附加信息。序列化器的快照读写逻辑在
`writeSnapshot` 以及 `readSnapshot` 中进行实现。
+
+需要注意的是快照本身的格式可能也需要随时间发生变化(比如,往快照中增加更多序列化器的信息)。为了方便,快照携带版本号,可以通过
`getCurrentVersion` 获取当前版本。在恢复的时候,从 savepoint 读取到快照后,`readSnapshot`
会调用对应版本的实现方法。
+
+在恢复时,检测序列化器格式是否发生变化的逻辑应该在 `resolveSchemaCompatibility`
中实现,该方法接收新的序列化器作为参数。该方法返回一个 `TypeSerializerSchemaCompatibility`
表示兼容性的结果,该结果有如下几种:
+ 1. **`TypeSerializerSchemaCompatibility.compatibleAsIs()`**:
该结果表明新的序列化器是兼容的,意味着新序列化器和之前的序列化器拥有相同的格式。也可能是在 `resolveSchemaCompatibility`
中对新序列化器继续重新配置所达到的。
+ 2. **`TypeSerializerSchemaCompatibility.compatibleAfterMigration()`**:
该结果表示新旧序列化器的格式不同,不过可以使用之前的序列化器反序列化,然后再用新序列化器进行序列化,从而进行迁移。
+ 3. **`TypeSerializerSchemaCompatibility.incompatible()`**:
该结果表示新旧序列化器的格式不同,且无法进行迁移。
+
+最后一点细节在于需要进行 state 迁移时如何获取之前的序列化器。`TypeSerializerSnapshot`
的另一个重要作用是可以构造之前的序列化器。更具体的说,`TypeSerializerSnapshot` 应该实现 `restoreSerializer`
方法,该方法返回一个可以安全读取之前序列化器写出数据的序列化器。
+
+### Flink 如何与 `TypeSerializer` 及 `TypeSerializerSnapshot` 交互
+
+总结一下,本节阐述了
Flink,或者更具体的说状态后端,是如何与这些抽象交互。根据状态后端的不同,交互方式略有不同,但这与序列化器以及序列化器快照的实现是正交的。
#### Off-heap state backends (e.g. `RocksDBStateBackend`)
- 1. **Register new state with a state serializer that has schema _A_**
- - the registered `TypeSerializer` for the state is used to read / write
state on every state access.
- - State is written in schema *A*.
+1. **以拥有格式 _A_ 的序列化器注册一个新的 state**
+ - state 的每次访问(读/写)都使用注册的 `TypeSerializer`.
+ - state 以格式 *A* 进行序列化.
2. **Take a savepoint**
- - The serializer snapshot is extracted via the
`TypeSerializer#snapshotConfiguration` method.
- - The serializer snapshot is written to the savepoint, as well as the
already-serialized state bytes (with schema *A*).
- 3. **Restored execution re-accesses restored state bytes with new state
serializer that has schema _B_**
- - The previous state serializer's snapshot is restored.
- - State bytes are not deserialized on restore, only loaded back to the state
backends (therefore, still in schema *A*).
- - Upon receiving the new serializer, it is provided to the restored previous
serializer's snapshot via the
- `TypeSerializer#resolveSchemaCompatibility` to check for schema
compatibility.
- 4. **Migrate state bytes in backend from schema _A_ to schema _B_**
- - If the compatibility resolution reflects that the schema has changed and
migration is possible, schema migration is
- performed. The previous state serializer which recognizes schema _A_ will be
obtained from the serializer snapshot, via
- `TypeSerializerSnapshot#restoreSerializer()`, and is used to deserialize
state bytes to objects, which in turn
- are re-written again with the new serializer, which recognizes schema _B_
to complete the migration. All entries
- of the accessed state is migrated all-together before processing continues.
- - If the resolution signals incompatibility, then the state access fails
with an exception.
-
+ - 通过 `TypeSerializer#snapshotConfiguration` 获取序列化器快照。
+ - 序列化器快照和序列化后的 state 数据一起写到 savepoin。
Review Comment:
```suggestion
- 序列化器快照和序列化后的 state 数据一起写到 savepoint。
```
##########
docs/content.zh/docs/dev/datastream/fault-tolerance/custom_serialization.md:
##########
@@ -366,84 +293,44 @@ public final class GenericArraySerializerSnapshot<C>
extends CompositeTypeSerial
```
</div>
-There are two important things to notice in the above code snippet. First of
all, since this
-`CompositeTypeSerializerSnapshot` implementation has outer snapshot
information that is written as part of the snapshot,
-the outer snapshot version, as defined by `getCurrentOuterSnapshotVersion()`,
must be upticked whenever the
-serialization format of the outer snapshot information changes.
-
-Second of all, notice how we avoid using Java serialization when writing the
component class, by only writing
-the classname and dynamically loading it when reading back the snapshot.
Avoiding Java serialization for writing
-contents of serializer snapshots is in general a good practice to follow. More
details about this is covered in the
-next section.
-
-## Implementation notes and best practices
-
-#### 1. Flink restores serializer snapshots by instantiating them with their
classname
-
-A serializer's snapshot, being the single source of truth for how a registered
state was serialized, serves as an
-entry point to reading state in savepoints. In order to be able to restore and
access previous state, the previous state
-serializer's snapshot must be able to be restored.
-
-Flink restores serializer snapshots by first instantiating the
`TypeSerializerSnapshot` with its classname (written
-along with the snapshot bytes). Therefore, to avoid being subject to
unintended classname changes or instantiation
-failures, `TypeSerializerSnapshot` classes should:
-
- - avoid being implemented as anonymous classes or nested classes,
- - have a public, nullary constructor for instantiation
-
-#### 2. Avoid sharing the same `TypeSerializerSnapshot` class across different
serializers
-
-Since schema compatibility checks goes through the serializer snapshots,
having multiple serializers returning
-the same `TypeSerializerSnapshot` class as their snapshot would complicate the
implementation for the
-`TypeSerializerSnapshot#resolveSchemaCompatibility` and
`TypeSerializerSnapshot#restoreSerializer()` method.
-
-This would also be a bad separation of concerns; a single serializer's
serialization schema,
-configuration, as well as how to restore it, should be consolidated in its own
dedicated `TypeSerializerSnapshot` class.
-
-#### 3. Avoid using Java serialization for serializer snapshot content
-
-Java serialization should not be used at all when writing contents of a
persisted serializer snapshot.
-Take for example, a serializer which needs to persist a class of its target
type as part of its snapshot.
-Information about the class should be persisted by writing the class name,
instead of directly serializing the class
-using Java. When reading the snapshot, the class name is read, and used to
dynamically load the class via the name.
-
-This practice ensures that serializer snapshots can always be safely read. In
the above example, if the type class
-was persisted using Java serialization, the snapshot may no longer be readable
once the class implementation has changed
-and is no longer binary compatible according to Java serialization specifics.
-
-## Migrating from deprecated serializer snapshot APIs before Flink 1.7
-
-This section is a guide for API migration from serializers and serializer
snapshots that existed before Flink 1.7.
-
-Before Flink 1.7, serializer snapshots were implemented as a
`TypeSerializerConfigSnapshot` (which is now deprecated,
-and will eventually be removed in the future to be fully replaced by the new
`TypeSerializerSnapshot` interface).
-Moreover, the responsibility of serializer schema compatibility checks lived
within the `TypeSerializer`,
-implemented in the
`TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)` method.
-
-Another major difference between the new and old abstractions is that the
deprecated `TypeSerializerConfigSnapshot`
-did not have the capability of instantiating the previous serializer.
Therefore, in the case where your serializer
-still returns a subclass of `TypeSerializerConfigSnapshot` as its snapshot,
the serializer instance itself will always
-be written to savepoints using Java serialization so that the previous
serializer may be available at restore time.
-This is very undesirable, since whether or not restoring the job will be
successful is susceptible to availability
-of the previous serializer's class, or in general, whether or not the
serializer instance can be read back at restore
-time using Java serialization. This means that you be limited to the same
serializer for your state,
-and could be problematic once you want to upgrade serializer classes or
perform schema migration.
-
-To be future-proof and have flexibility to migrate your state serializers and
schema, it is highly recommended to
-migrate from the old abstractions. The steps to do this is as follows:
-
- 1. Implement a new subclass of `TypeSerializerSnapshot`. This will be the new
snapshot for your serializer.
- 2. Return the new `TypeSerializerSnapshot` as the serializer snapshot for
your serializer in the
- `TypeSerializer#snapshotConfiguration()` method.
- 3. Restore the job from the savepoint that existed before Flink 1.7, and then
take a savepoint again.
- Note that at this step, the old `TypeSerializerConfigSnapshot` of the
serializer must still exist in the classpath,
- and the implementation for the
`TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)` method must
not be
- removed. The purpose of this process is to replace the
`TypeSerializerConfigSnapshot` written in old savepoints
- with the newly implemented `TypeSerializerSnapshot` for the serializer.
- 4. Once you have a savepoint taken with Flink 1.7, the savepoint will contain
`TypeSerializerSnapshot` as the
- state serializer snapshot, and the serializer instance will no longer be
written in the savepoint.
- At this point, it is now safe to remove all implementations of the old
abstraction (remove the old
- `TypeSerializerConfigSnapshot` implementation as will as the
- `TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)` from the
serializer).
+上述代码片段中,有两个重要的事情需要注意。首先,外部序列化器的额外信息会被持久化到快照中,因此当外部序列化器的额外信息被修改后,`getCurrentOuterSnapshotVersion()`
的返回值也需要随之升级。
+
+另外就是,注意我们如何避免使用 Java 的序列化器,仅持久化类名,并在恢复时进行动态加载。避免使用 Java
的序列化器通常是一个很好的做法,有关这一点的详细描述将在下一节介绍。
+
+## 实现细节及最佳实践
+
+#### 1. Flink 通过类名实例化序列化器的快照
+
+序列化器的快照,是如何序列化 state 的唯一事实标准。为了能够恢复并访问之前的 state,之前序列化器的快照也必须进行恢复。
+
+Flink 会使用类名实例化具体的 `TypeSerializerSnapshot`(类名会和快照一起写出)。因此,为了防止无意的类名改动或者初始化
`TypeSerializerSnapshot` 失败,需要保证:
+ - 不能以匿名类或者内部类的形式实现
+ - 有一个无参的 public 构造函数
+
+#### 2. 避免在多个序列化器之间共用一个 `TypeSerializerSnapshot`
+
+由于格式兼容检查是通过序列化器快照进行的,因此多个序列化器返回同一个 `TypeSerializerSnapshot` 会导致
`TypeSerializerSnapshot#resolveSchemaCompatibility` 和
`TypeSerializerSnapshot#restoreSerializer()` 的实现变复杂。
+
+共用 `TypeSerializerSnapshot` 从分工来说也是不好的选择,每个序列化器的的格式,配置信息以及如何恢复等都应该放到一个单独的
`TypeSerializerSnapshot` 中。
+
+#### 3. 避免使用 Java 默认的序列化器
+
+应该避免使用 Java
序列化器序列化序列化器快照。举例来说,序列化器在序列化一个目标类的时候,应该序列化类名,而不是直接序列化该对象。恢复快照时,通过读取类名,然后动态加载生成具体的序列化器快照对象。
+
+这种方式确保了序列化器快照始终可以被安全地读取。在上面的例子中,如果通过 Java
序列化器把类序列化,那么一旦序列化器快照类的实现进行了修改,会由于二进制兼容问题导致无法读取。
+
+## 从 1.7 之前的版本进行迁移
+
+本节将介绍如何从 Flink 1.7 之前的序列化器以及序列化器快照 API 进行迁移。
+
+在 Flink 1.7 之前,序列化器快照是以 `TypeSerializerConfigSnapshot` 存在(现在已经过时,会在未来被
`TypeSerializerSnapshot` 完全取代并删除)。此外,序列化器格式的兼容性检查在
`TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)` 中执行。
+
+新旧实现的另一个主要区别是,`TypeSerializerConfigSnapshot` 无法实例化之前的序列化器。因此,序列化器仍然需要返回一个
`TypeSerializerConfigSnapshot` 子类作为快照,序列化器对象本身会通过 Java 序列化器写到 savepoint
中,这样恢复时可能可以从 savepoint 中恢复原来的序列化器。这是非常不可取的,因为作业是否能够恢复成功,取决于能否从 savepoint
正确恢复原来的序列化器。这限制了序列化器的升级,一旦序列化器继续升级就可能出问题。
+
+为将来序列化器能够能够升级,强烈建议迁移使用新 API,具体操作步骤如下所示:
+ 1. 实现 `TypeSerializerSnapshot` 的一个子类作为序列化器的快照类。
+ 2. 在调用 `TypeSerializer#snapshotConfiguration()` 时返回新的序列化器快照类
+ 3. 从就作业的 savepoint 恢复,并执行一个新的 savepoint。注意:这一步中旧的
`TypeSerializerConfigSnapshot` 必须存在于 classpath 中,并且
`TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)`
不能被删除。这个操作是希望将 savepoint 中的 `TypeSerializerConfigSnapshot` 替换成新的
`TypeSerializerSnapshot`。
+ 4. 新版本(>= 1.7) 的 savepoint 包含 `TypeSerializerSnapshot`
作为序列化器的快照,而且不再需要序列化序列化器对象到 savepoint 中。现在可以安全的删除 `TypeSerializerConfigSnapshot`
的子类以及 `TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)` 实现。
Review Comment:
```suggestion
4. 使用Flink 1.7获取 savepoint 后,savepoint 将包含 “TypeSerializerSnapshot”
作为状态序列化器快照,序列化器实例将不再写入 savepoint。此时可以安全的删除 `TypeSerializerConfigSnapshot` 的子类以及
`TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)` 实现。
```
##########
docs/content.zh/docs/dev/datastream/fault-tolerance/custom_serialization.md:
##########
@@ -69,30 +64,22 @@ checkpointedState =
getRuntimeContext.getListState(descriptor)
{{< /tab >}}
{{< /tabs >}}
-## State serializers and schema evolution
+## 状态序列化器及格式演进
-This section explains the user-facing abstractions related to state
serialization and schema evolution, and necessary
-internal details about how Flink interacts with these abstractions.
+本节介绍了 state 序列化以及结构升级相关的面向用户的抽象,以及 Flink 如何与这些抽象交互的一些内部细节。
-When restoring from savepoints, Flink allows changing the serializers used to
read and write previously registered state,
-so that users are not locked in to any specific serialization schema. When
state is restored, a new serializer will be
-registered for the state (i.e., the serializer that comes with the
`StateDescriptor` used to access the state in the
-restored job). This new serializer may have a different schema than that of
the previous serializer. Therefore, when
-implementing state serializers, besides the basic logic of reading / writing
data, another important thing to keep in
-mind is how the serialization schema can be changed in the future.
+从 savepoint 恢复时,Flink 允许更改 state 的序列化器,从而支持结构升级。state 恢复之后,将使用新的序列化器进行 state
注册(即恢复后作业中 `StateDescriptor` 指定的序列化器),
Review Comment:
```suggestion
从 savepoint 恢复时,Flink 允许更改 state 的序列化器,从而支持格式演进。state 恢复之后,将使用新的序列化器进行 state
注册(即恢复后作业中 `StateDescriptor` 指定的序列化器),
```
Ditto
##########
docs/content.zh/docs/dev/datastream/fault-tolerance/custom_serialization.md:
##########
@@ -24,23 +24,18 @@ specific language governing permissions and limitations
under the License.
-->
-# Custom Serialization for Managed State
+# 自定义序列化器
-This page is targeted as a guideline for users who require the use of custom
serialization for their state, covering
-how to provide a custom state serializer as well as guidelines and best
practices for implementing serializers that allow
-state schema evolution.
+本页面为需要自定义序列化器的用户提供指导,包括如何自定义序列化器,以及实现支持状态升级的序列化器的最佳指南。
Review Comment:
```suggestion
本页面为需要自定义序列化器的用户提供指导,包括如何自定义序列化器,以及实现支持状态格式演进(state schema
evolution)的序列化器的最佳实践。
```
##########
docs/content.zh/docs/dev/datastream/fault-tolerance/custom_serialization.md:
##########
@@ -69,30 +64,22 @@ checkpointedState =
getRuntimeContext.getListState(descriptor)
{{< /tab >}}
{{< /tabs >}}
-## State serializers and schema evolution
+## 状态序列化器及格式演进
-This section explains the user-facing abstractions related to state
serialization and schema evolution, and necessary
-internal details about how Flink interacts with these abstractions.
+本节介绍了 state 序列化以及结构升级相关的面向用户的抽象,以及 Flink 如何与这些抽象交互的一些内部细节。
-When restoring from savepoints, Flink allows changing the serializers used to
read and write previously registered state,
-so that users are not locked in to any specific serialization schema. When
state is restored, a new serializer will be
-registered for the state (i.e., the serializer that comes with the
`StateDescriptor` used to access the state in the
-restored job). This new serializer may have a different schema than that of
the previous serializer. Therefore, when
-implementing state serializers, besides the basic logic of reading / writing
data, another important thing to keep in
-mind is how the serialization schema can be changed in the future.
+从 savepoint 恢复时,Flink 允许更改 state 的序列化器,从而支持结构升级。state 恢复之后,将使用新的序列化器进行 state
注册(即恢复后作业中 `StateDescriptor` 指定的序列化器),
+新的序列化器可能和之前的序列化器拥有不同的结构。因此,实现 state
序列化器的时候,处理正确处理读写数据的基本逻辑外,另外一个需要重点考虑的是未来如何支持 state 的机构升级。
Review Comment:
```suggestion
新的序列化器可能和之前的序列化器拥有不同的格式(schema)。因此,实现 state
序列化器的时候,处理正确处理读写数据的基本逻辑外,另外一个需要重点考虑的是未来如何支持 state 的格式演进。
```
Ditto
##########
docs/content.zh/docs/dev/datastream/fault-tolerance/custom_serialization.md:
##########
@@ -112,109 +99,72 @@ public abstract class TypeSerializer<T> {
}
```
-A serializer's `TypeSerializerSnapshot` is a point-in-time information that
serves as the single source of truth about
-the state serializer's write schema, as well as any additional information
mandatory to restore a serializer that
-would be identical to the given point-in-time. The logic about what should be
written and read at restore time
-as the serializer snapshot is defined in the `writeSnapshot` and
`readSnapshot` methods.
-
-Note that the snapshot's own write schema may also need to change over time
(e.g. when you wish to add more information
-about the serializer to the snapshot). To facilitate this, snapshots are
versioned, with the current version
-number defined in the `getCurrentVersion` method. On restore, when the
serializer snapshot is read from savepoints,
-the version of the schema in which the snapshot was written in will be
provided to the `readSnapshot` method so that
-the read implementation can handle different versions.
-
-At restore time, the logic that detects whether or not the new serializer's
schema has changed should be implemented in
-the `resolveSchemaCompatibility` method. When previous registered state is
registered again with new serializers in the
-restored execution of an operator, the new serializer is provided to the
previous serializer's snapshot via this method.
-This method returns a `TypeSerializerSchemaCompatibility` representing the
result of the compatibility resolution,
-which can be one of the following:
-
- 1. **`TypeSerializerSchemaCompatibility.compatibleAsIs()`**: this result
signals that the new serializer is compatible,
- meaning that the new serializer has identical schema with the previous
serializer. It is possible that the new
- serializer has been reconfigured in the `resolveSchemaCompatibility` method
so that it is compatible.
- 2. **`TypeSerializerSchemaCompatibility.compatibleAfterMigration()`**: this
result signals that the new serializer has a
- different serialization schema, and it is possible to migrate from the old
schema by using the previous serializer
- (which recognizes the old schema) to read bytes into state objects, and then
rewriting the object back to bytes with
- the new serializer (which recognizes the new schema).
- 3. **`TypeSerializerSchemaCompatibility.incompatible()`**: this result
signals that the new serializer has a
- different serialization schema, but it is not possible to migrate from the
old schema.
-
-The last bit of detail is how the previous serializer is obtained in the case
that migration is required.
-Another important role of a serializer's `TypeSerializerSnapshot` is that it
serves as a factory to restore
-the previous serializer. More specifically, the `TypeSerializerSnapshot`
should implement the `restoreSerializer` method
-to instantiate a serializer instance that recognizes the previous serializer's
schema and configuration, and can therefore
-safely read data written by the previous serializer.
-
-### How Flink interacts with the `TypeSerializer` and `TypeSerializerSnapshot`
abstractions
-
-To wrap up, this section concludes how Flink, or more specifically the state
backends, interact with the
-abstractions. The interaction is slightly different depending on the state
backend, but this is orthogonal
-to the implementation of state serializers and their serializer snapshots.
+序列化器的 `TypeSerializerSnapshot` 包含序列化器的结构信息,以及恢复序列化器所需要的其他附加信息。序列化器的快照读写逻辑在
`writeSnapshot` 以及 `readSnapshot` 中进行实现。
+
+需要注意的是快照本身的格式可能也需要随时间发生变化(比如,往快照中增加更多序列化器的信息)。为了方便,快照携带版本号,可以通过
`getCurrentVersion` 获取当前版本。在恢复的时候,从 savepoint 读取到快照后,`readSnapshot`
会调用对应版本的实现方法。
+
+在恢复时,检测序列化器格式是否发生变化的逻辑应该在 `resolveSchemaCompatibility`
中实现,该方法接收新的序列化器作为参数。该方法返回一个 `TypeSerializerSchemaCompatibility`
表示兼容性的结果,该结果有如下几种:
+ 1. **`TypeSerializerSchemaCompatibility.compatibleAsIs()`**:
该结果表明新的序列化器是兼容的,意味着新序列化器和之前的序列化器拥有相同的格式。也可能是在 `resolveSchemaCompatibility`
中对新序列化器继续重新配置所达到的。
+ 2. **`TypeSerializerSchemaCompatibility.compatibleAfterMigration()`**:
该结果表示新旧序列化器的格式不同,不过可以使用之前的序列化器反序列化,然后再用新序列化器进行序列化,从而进行迁移。
Review Comment:
```suggestion
2. **`TypeSerializerSchemaCompatibility.compatibleAfterMigration()`**:
该结果表示新旧序列化器的格式不同,不过可以使用之前的序列化器反序列化,然后再用新序列化器进行序列化,从而实现迁移。
```
##########
docs/content.zh/docs/dev/datastream/fault-tolerance/custom_serialization.md:
##########
@@ -112,109 +99,72 @@ public abstract class TypeSerializer<T> {
}
```
-A serializer's `TypeSerializerSnapshot` is a point-in-time information that
serves as the single source of truth about
-the state serializer's write schema, as well as any additional information
mandatory to restore a serializer that
-would be identical to the given point-in-time. The logic about what should be
written and read at restore time
-as the serializer snapshot is defined in the `writeSnapshot` and
`readSnapshot` methods.
-
-Note that the snapshot's own write schema may also need to change over time
(e.g. when you wish to add more information
-about the serializer to the snapshot). To facilitate this, snapshots are
versioned, with the current version
-number defined in the `getCurrentVersion` method. On restore, when the
serializer snapshot is read from savepoints,
-the version of the schema in which the snapshot was written in will be
provided to the `readSnapshot` method so that
-the read implementation can handle different versions.
-
-At restore time, the logic that detects whether or not the new serializer's
schema has changed should be implemented in
-the `resolveSchemaCompatibility` method. When previous registered state is
registered again with new serializers in the
-restored execution of an operator, the new serializer is provided to the
previous serializer's snapshot via this method.
-This method returns a `TypeSerializerSchemaCompatibility` representing the
result of the compatibility resolution,
-which can be one of the following:
-
- 1. **`TypeSerializerSchemaCompatibility.compatibleAsIs()`**: this result
signals that the new serializer is compatible,
- meaning that the new serializer has identical schema with the previous
serializer. It is possible that the new
- serializer has been reconfigured in the `resolveSchemaCompatibility` method
so that it is compatible.
- 2. **`TypeSerializerSchemaCompatibility.compatibleAfterMigration()`**: this
result signals that the new serializer has a
- different serialization schema, and it is possible to migrate from the old
schema by using the previous serializer
- (which recognizes the old schema) to read bytes into state objects, and then
rewriting the object back to bytes with
- the new serializer (which recognizes the new schema).
- 3. **`TypeSerializerSchemaCompatibility.incompatible()`**: this result
signals that the new serializer has a
- different serialization schema, but it is not possible to migrate from the
old schema.
-
-The last bit of detail is how the previous serializer is obtained in the case
that migration is required.
-Another important role of a serializer's `TypeSerializerSnapshot` is that it
serves as a factory to restore
-the previous serializer. More specifically, the `TypeSerializerSnapshot`
should implement the `restoreSerializer` method
-to instantiate a serializer instance that recognizes the previous serializer's
schema and configuration, and can therefore
-safely read data written by the previous serializer.
-
-### How Flink interacts with the `TypeSerializer` and `TypeSerializerSnapshot`
abstractions
-
-To wrap up, this section concludes how Flink, or more specifically the state
backends, interact with the
-abstractions. The interaction is slightly different depending on the state
backend, but this is orthogonal
-to the implementation of state serializers and their serializer snapshots.
+序列化器的 `TypeSerializerSnapshot` 包含序列化器的结构信息,以及恢复序列化器所需要的其他附加信息。序列化器的快照读写逻辑在
`writeSnapshot` 以及 `readSnapshot` 中进行实现。
+
+需要注意的是快照本身的格式可能也需要随时间发生变化(比如,往快照中增加更多序列化器的信息)。为了方便,快照携带版本号,可以通过
`getCurrentVersion` 获取当前版本。在恢复的时候,从 savepoint 读取到快照后,`readSnapshot`
会调用对应版本的实现方法。
+
+在恢复时,检测序列化器格式是否发生变化的逻辑应该在 `resolveSchemaCompatibility`
中实现,该方法接收新的序列化器作为参数。该方法返回一个 `TypeSerializerSchemaCompatibility`
表示兼容性的结果,该结果有如下几种:
+ 1. **`TypeSerializerSchemaCompatibility.compatibleAsIs()`**:
该结果表明新的序列化器是兼容的,意味着新序列化器和之前的序列化器拥有相同的格式。也可能是在 `resolveSchemaCompatibility`
中对新序列化器继续重新配置所达到的。
+ 2. **`TypeSerializerSchemaCompatibility.compatibleAfterMigration()`**:
该结果表示新旧序列化器的格式不同,不过可以使用之前的序列化器反序列化,然后再用新序列化器进行序列化,从而进行迁移。
+ 3. **`TypeSerializerSchemaCompatibility.incompatible()`**:
该结果表示新旧序列化器的格式不同,且无法进行迁移。
+
+最后一点细节在于需要进行 state 迁移时如何获取之前的序列化器。`TypeSerializerSnapshot`
的另一个重要作用是可以构造之前的序列化器。更具体的说,`TypeSerializerSnapshot` 应该实现 `restoreSerializer`
方法,该方法返回一个可以安全读取之前序列化器写出数据的序列化器。
+
+### Flink 如何与 `TypeSerializer` 及 `TypeSerializerSnapshot` 交互
+
+总结一下,本节阐述了
Flink,或者更具体的说状态后端,是如何与这些抽象交互。根据状态后端的不同,交互方式略有不同,但这与序列化器以及序列化器快照的实现是正交的。
#### Off-heap state backends (e.g. `RocksDBStateBackend`)
- 1. **Register new state with a state serializer that has schema _A_**
- - the registered `TypeSerializer` for the state is used to read / write
state on every state access.
- - State is written in schema *A*.
+1. **以拥有格式 _A_ 的序列化器注册一个新的 state**
+ - state 的每次访问(读/写)都使用注册的 `TypeSerializer`.
+ - state 以格式 *A* 进行序列化.
2. **Take a savepoint**
Review Comment:
```suggestion
2. **创建一个保存点(savepoint)**
```
##########
docs/content.zh/docs/dev/datastream/fault-tolerance/custom_serialization.md:
##########
@@ -69,30 +64,22 @@ checkpointedState =
getRuntimeContext.getListState(descriptor)
{{< /tab >}}
{{< /tabs >}}
-## State serializers and schema evolution
+## 状态序列化器及格式演进
-This section explains the user-facing abstractions related to state
serialization and schema evolution, and necessary
-internal details about how Flink interacts with these abstractions.
+本节介绍了 state 序列化以及结构升级相关的面向用户的抽象,以及 Flink 如何与这些抽象交互的一些内部细节。
-When restoring from savepoints, Flink allows changing the serializers used to
read and write previously registered state,
-so that users are not locked in to any specific serialization schema. When
state is restored, a new serializer will be
-registered for the state (i.e., the serializer that comes with the
`StateDescriptor` used to access the state in the
-restored job). This new serializer may have a different schema than that of
the previous serializer. Therefore, when
-implementing state serializers, besides the basic logic of reading / writing
data, another important thing to keep in
-mind is how the serialization schema can be changed in the future.
+从 savepoint 恢复时,Flink 允许更改 state 的序列化器,从而支持结构升级。state 恢复之后,将使用新的序列化器进行 state
注册(即恢复后作业中 `StateDescriptor` 指定的序列化器),
+新的序列化器可能和之前的序列化器拥有不同的结构。因此,实现 state
序列化器的时候,处理正确处理读写数据的基本逻辑外,另外一个需要重点考虑的是未来如何支持 state 的机构升级。
-When speaking of *schema*, in this context the term is interchangeable between
referring to the *data model* of a state
-type and the *serialized binary format* of a state type. The schema, generally
speaking, can change for a few cases:
+这里说的 *结构*,既可能指 state 的 *数据模型*,也可能指 *序列化之后的二进制格式"。一般来说,结构在如下情况下会发生改变:
- 1. Data schema of the state type has evolved, i.e. adding or removing a field
from a POJO that is used as state.
- 2. Generally speaking, after a change to the data schema, the serialization
format of the serializer will need to be upgraded.
- 3. Configuration of the serializer has changed.
-
-In order for the new execution to have information about the *written schema*
of state and detect whether or not the
-schema has changed, upon taking a savepoint of an operator's state, a
*snapshot* of the state serializer needs to be
-written along with the state bytes. This is abstracted a
`TypeSerializerSnapshot`, explained in the next subsection.
+ 1. state 的结构发生变化,比如 POJO 类中增加或删除字段。
+ 2. 一般来说,数据格式变化之后,序列化器的序列化格式需要进行升级。
+ 3. 序列化器的配置发生了变化。
-### The `TypeSerializerSnapshot` abstraction
+为了能在新作业中获取到之前 state 的结构,并检测到模式是否发生变化,在生成 savepoint 的时候,会把 state
序列化器的快照也一并写出。这个快照被抽象为 `TypeSerializerSnapshot`,在下一节中详细描述
Review Comment:
```suggestion
为了能在新作业中获取到之前 state 的格式,并检测到模式是否发生变化,在生成 savepoint 的时候,会把 state
序列化器的快照也一并写出。这个快照被抽象为 `TypeSerializerSnapshot`,在下一节中详细描述
```
Ditto
##########
docs/content.zh/docs/dev/datastream/fault-tolerance/custom_serialization.md:
##########
@@ -366,84 +293,44 @@ public final class GenericArraySerializerSnapshot<C>
extends CompositeTypeSerial
```
</div>
-There are two important things to notice in the above code snippet. First of
all, since this
-`CompositeTypeSerializerSnapshot` implementation has outer snapshot
information that is written as part of the snapshot,
-the outer snapshot version, as defined by `getCurrentOuterSnapshotVersion()`,
must be upticked whenever the
-serialization format of the outer snapshot information changes.
-
-Second of all, notice how we avoid using Java serialization when writing the
component class, by only writing
-the classname and dynamically loading it when reading back the snapshot.
Avoiding Java serialization for writing
-contents of serializer snapshots is in general a good practice to follow. More
details about this is covered in the
-next section.
-
-## Implementation notes and best practices
-
-#### 1. Flink restores serializer snapshots by instantiating them with their
classname
-
-A serializer's snapshot, being the single source of truth for how a registered
state was serialized, serves as an
-entry point to reading state in savepoints. In order to be able to restore and
access previous state, the previous state
-serializer's snapshot must be able to be restored.
-
-Flink restores serializer snapshots by first instantiating the
`TypeSerializerSnapshot` with its classname (written
-along with the snapshot bytes). Therefore, to avoid being subject to
unintended classname changes or instantiation
-failures, `TypeSerializerSnapshot` classes should:
-
- - avoid being implemented as anonymous classes or nested classes,
- - have a public, nullary constructor for instantiation
-
-#### 2. Avoid sharing the same `TypeSerializerSnapshot` class across different
serializers
-
-Since schema compatibility checks goes through the serializer snapshots,
having multiple serializers returning
-the same `TypeSerializerSnapshot` class as their snapshot would complicate the
implementation for the
-`TypeSerializerSnapshot#resolveSchemaCompatibility` and
`TypeSerializerSnapshot#restoreSerializer()` method.
-
-This would also be a bad separation of concerns; a single serializer's
serialization schema,
-configuration, as well as how to restore it, should be consolidated in its own
dedicated `TypeSerializerSnapshot` class.
-
-#### 3. Avoid using Java serialization for serializer snapshot content
-
-Java serialization should not be used at all when writing contents of a
persisted serializer snapshot.
-Take for example, a serializer which needs to persist a class of its target
type as part of its snapshot.
-Information about the class should be persisted by writing the class name,
instead of directly serializing the class
-using Java. When reading the snapshot, the class name is read, and used to
dynamically load the class via the name.
-
-This practice ensures that serializer snapshots can always be safely read. In
the above example, if the type class
-was persisted using Java serialization, the snapshot may no longer be readable
once the class implementation has changed
-and is no longer binary compatible according to Java serialization specifics.
-
-## Migrating from deprecated serializer snapshot APIs before Flink 1.7
-
-This section is a guide for API migration from serializers and serializer
snapshots that existed before Flink 1.7.
-
-Before Flink 1.7, serializer snapshots were implemented as a
`TypeSerializerConfigSnapshot` (which is now deprecated,
-and will eventually be removed in the future to be fully replaced by the new
`TypeSerializerSnapshot` interface).
-Moreover, the responsibility of serializer schema compatibility checks lived
within the `TypeSerializer`,
-implemented in the
`TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)` method.
-
-Another major difference between the new and old abstractions is that the
deprecated `TypeSerializerConfigSnapshot`
-did not have the capability of instantiating the previous serializer.
Therefore, in the case where your serializer
-still returns a subclass of `TypeSerializerConfigSnapshot` as its snapshot,
the serializer instance itself will always
-be written to savepoints using Java serialization so that the previous
serializer may be available at restore time.
-This is very undesirable, since whether or not restoring the job will be
successful is susceptible to availability
-of the previous serializer's class, or in general, whether or not the
serializer instance can be read back at restore
-time using Java serialization. This means that you be limited to the same
serializer for your state,
-and could be problematic once you want to upgrade serializer classes or
perform schema migration.
-
-To be future-proof and have flexibility to migrate your state serializers and
schema, it is highly recommended to
-migrate from the old abstractions. The steps to do this is as follows:
-
- 1. Implement a new subclass of `TypeSerializerSnapshot`. This will be the new
snapshot for your serializer.
- 2. Return the new `TypeSerializerSnapshot` as the serializer snapshot for
your serializer in the
- `TypeSerializer#snapshotConfiguration()` method.
- 3. Restore the job from the savepoint that existed before Flink 1.7, and then
take a savepoint again.
- Note that at this step, the old `TypeSerializerConfigSnapshot` of the
serializer must still exist in the classpath,
- and the implementation for the
`TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)` method must
not be
- removed. The purpose of this process is to replace the
`TypeSerializerConfigSnapshot` written in old savepoints
- with the newly implemented `TypeSerializerSnapshot` for the serializer.
- 4. Once you have a savepoint taken with Flink 1.7, the savepoint will contain
`TypeSerializerSnapshot` as the
- state serializer snapshot, and the serializer instance will no longer be
written in the savepoint.
- At this point, it is now safe to remove all implementations of the old
abstraction (remove the old
- `TypeSerializerConfigSnapshot` implementation as will as the
- `TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)` from the
serializer).
+上述代码片段中,有两个重要的事情需要注意。首先,外部序列化器的额外信息会被持久化到快照中,因此当外部序列化器的额外信息被修改后,`getCurrentOuterSnapshotVersion()`
的返回值也需要随之升级。
+
+另外就是,注意我们如何避免使用 Java 的序列化器,仅持久化类名,并在恢复时进行动态加载。避免使用 Java
的序列化器通常是一个很好的做法,有关这一点的详细描述将在下一节介绍。
+
+## 实现细节及最佳实践
+
+#### 1. Flink 通过类名实例化序列化器的快照
+
+序列化器的快照,是如何序列化 state 的唯一事实标准。为了能够恢复并访问之前的 state,之前序列化器的快照也必须进行恢复。
+
+Flink 会使用类名实例化具体的 `TypeSerializerSnapshot`(类名会和快照一起写出)。因此,为了防止无意的类名改动或者初始化
`TypeSerializerSnapshot` 失败,需要保证:
+ - 不能以匿名类或者内部类的形式实现
+ - 有一个无参的 public 构造函数
+
+#### 2. 避免在多个序列化器之间共用一个 `TypeSerializerSnapshot`
+
+由于格式兼容检查是通过序列化器快照进行的,因此多个序列化器返回同一个 `TypeSerializerSnapshot` 会导致
`TypeSerializerSnapshot#resolveSchemaCompatibility` 和
`TypeSerializerSnapshot#restoreSerializer()` 的实现变复杂。
+
+共用 `TypeSerializerSnapshot` 从分工来说也是不好的选择,每个序列化器的的格式,配置信息以及如何恢复等都应该放到一个单独的
`TypeSerializerSnapshot` 中。
+
+#### 3. 避免使用 Java 默认的序列化器
+
+应该避免使用 Java
序列化器序列化序列化器快照。举例来说,序列化器在序列化一个目标类的时候,应该序列化类名,而不是直接序列化该对象。恢复快照时,通过读取类名,然后动态加载生成具体的序列化器快照对象。
+
+这种方式确保了序列化器快照始终可以被安全地读取。在上面的例子中,如果通过 Java
序列化器把类序列化,那么一旦序列化器快照类的实现进行了修改,会由于二进制兼容问题导致无法读取。
+
+## 从 1.7 之前的版本进行迁移
+
+本节将介绍如何从 Flink 1.7 之前的序列化器以及序列化器快照 API 进行迁移。
+
+在 Flink 1.7 之前,序列化器快照是以 `TypeSerializerConfigSnapshot` 存在(现在已经过时,会在未来被
`TypeSerializerSnapshot` 完全取代并删除)。此外,序列化器格式的兼容性检查在
`TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)` 中执行。
+
+新旧实现的另一个主要区别是,`TypeSerializerConfigSnapshot` 无法实例化之前的序列化器。因此,序列化器仍然需要返回一个
`TypeSerializerConfigSnapshot` 子类作为快照,序列化器对象本身会通过 Java 序列化器写到 savepoint
中,这样恢复时可能可以从 savepoint 中恢复原来的序列化器。这是非常不可取的,因为作业是否能够恢复成功,取决于能否从 savepoint
正确恢复原来的序列化器。这限制了序列化器的升级,一旦序列化器继续升级就可能出问题。
+
+为将来序列化器能够能够升级,强烈建议迁移使用新 API,具体操作步骤如下所示:
+ 1. 实现 `TypeSerializerSnapshot` 的一个子类作为序列化器的快照类。
+ 2. 在调用 `TypeSerializer#snapshotConfiguration()` 时返回新的序列化器快照类
+ 3. 从就作业的 savepoint 恢复,并执行一个新的 savepoint。注意:这一步中旧的
`TypeSerializerConfigSnapshot` 必须存在于 classpath 中,并且
`TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)`
不能被删除。这个操作是希望将 savepoint 中的 `TypeSerializerConfigSnapshot` 替换成新的
`TypeSerializerSnapshot`。
Review Comment:
```suggestion
3. 从1.7版本之前的旧作业的 savepoint 恢复,然后触发一个新的 savepoint。注意:这一步中旧的
`TypeSerializerConfigSnapshot` 必须存在于 classpath 中,并且
`TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)`
不能被删除。这个操作是希望将 savepoint 中的 `TypeSerializerConfigSnapshot` 替换成新的
`TypeSerializerSnapshot`。
```
##########
docs/content.zh/docs/dev/datastream/fault-tolerance/custom_serialization.md:
##########
@@ -112,109 +99,72 @@ public abstract class TypeSerializer<T> {
}
```
-A serializer's `TypeSerializerSnapshot` is a point-in-time information that
serves as the single source of truth about
-the state serializer's write schema, as well as any additional information
mandatory to restore a serializer that
-would be identical to the given point-in-time. The logic about what should be
written and read at restore time
-as the serializer snapshot is defined in the `writeSnapshot` and
`readSnapshot` methods.
-
-Note that the snapshot's own write schema may also need to change over time
(e.g. when you wish to add more information
-about the serializer to the snapshot). To facilitate this, snapshots are
versioned, with the current version
-number defined in the `getCurrentVersion` method. On restore, when the
serializer snapshot is read from savepoints,
-the version of the schema in which the snapshot was written in will be
provided to the `readSnapshot` method so that
-the read implementation can handle different versions.
-
-At restore time, the logic that detects whether or not the new serializer's
schema has changed should be implemented in
-the `resolveSchemaCompatibility` method. When previous registered state is
registered again with new serializers in the
-restored execution of an operator, the new serializer is provided to the
previous serializer's snapshot via this method.
-This method returns a `TypeSerializerSchemaCompatibility` representing the
result of the compatibility resolution,
-which can be one of the following:
-
- 1. **`TypeSerializerSchemaCompatibility.compatibleAsIs()`**: this result
signals that the new serializer is compatible,
- meaning that the new serializer has identical schema with the previous
serializer. It is possible that the new
- serializer has been reconfigured in the `resolveSchemaCompatibility` method
so that it is compatible.
- 2. **`TypeSerializerSchemaCompatibility.compatibleAfterMigration()`**: this
result signals that the new serializer has a
- different serialization schema, and it is possible to migrate from the old
schema by using the previous serializer
- (which recognizes the old schema) to read bytes into state objects, and then
rewriting the object back to bytes with
- the new serializer (which recognizes the new schema).
- 3. **`TypeSerializerSchemaCompatibility.incompatible()`**: this result
signals that the new serializer has a
- different serialization schema, but it is not possible to migrate from the
old schema.
-
-The last bit of detail is how the previous serializer is obtained in the case
that migration is required.
-Another important role of a serializer's `TypeSerializerSnapshot` is that it
serves as a factory to restore
-the previous serializer. More specifically, the `TypeSerializerSnapshot`
should implement the `restoreSerializer` method
-to instantiate a serializer instance that recognizes the previous serializer's
schema and configuration, and can therefore
-safely read data written by the previous serializer.
-
-### How Flink interacts with the `TypeSerializer` and `TypeSerializerSnapshot`
abstractions
-
-To wrap up, this section concludes how Flink, or more specifically the state
backends, interact with the
-abstractions. The interaction is slightly different depending on the state
backend, but this is orthogonal
-to the implementation of state serializers and their serializer snapshots.
+序列化器的 `TypeSerializerSnapshot` 包含序列化器的结构信息,以及恢复序列化器所需要的其他附加信息。序列化器的快照读写逻辑在
`writeSnapshot` 以及 `readSnapshot` 中进行实现。
+
+需要注意的是快照本身的格式可能也需要随时间发生变化(比如,往快照中增加更多序列化器的信息)。为了方便,快照携带版本号,可以通过
`getCurrentVersion` 获取当前版本。在恢复的时候,从 savepoint 读取到快照后,`readSnapshot`
会调用对应版本的实现方法。
+
+在恢复时,检测序列化器格式是否发生变化的逻辑应该在 `resolveSchemaCompatibility`
中实现,该方法接收新的序列化器作为参数。该方法返回一个 `TypeSerializerSchemaCompatibility`
表示兼容性的结果,该结果有如下几种:
+ 1. **`TypeSerializerSchemaCompatibility.compatibleAsIs()`**:
该结果表明新的序列化器是兼容的,意味着新序列化器和之前的序列化器拥有相同的格式。也可能是在 `resolveSchemaCompatibility`
中对新序列化器继续重新配置所达到的。
Review Comment:
```suggestion
1. **`TypeSerializerSchemaCompatibility.compatibleAsIs()`**:
该结果表明新的序列化器是兼容的,意味着新序列化器和之前的序列化器拥有相同的格式。这也可能是因为在 `resolveSchemaCompatibility`
方法中对新序列化器进行了重新配置,从而实现了兼容。
```
##########
docs/content.zh/docs/dev/datastream/fault-tolerance/custom_serialization.md:
##########
@@ -366,84 +293,44 @@ public final class GenericArraySerializerSnapshot<C>
extends CompositeTypeSerial
```
</div>
-There are two important things to notice in the above code snippet. First of
all, since this
-`CompositeTypeSerializerSnapshot` implementation has outer snapshot
information that is written as part of the snapshot,
-the outer snapshot version, as defined by `getCurrentOuterSnapshotVersion()`,
must be upticked whenever the
-serialization format of the outer snapshot information changes.
-
-Second of all, notice how we avoid using Java serialization when writing the
component class, by only writing
-the classname and dynamically loading it when reading back the snapshot.
Avoiding Java serialization for writing
-contents of serializer snapshots is in general a good practice to follow. More
details about this is covered in the
-next section.
-
-## Implementation notes and best practices
-
-#### 1. Flink restores serializer snapshots by instantiating them with their
classname
-
-A serializer's snapshot, being the single source of truth for how a registered
state was serialized, serves as an
-entry point to reading state in savepoints. In order to be able to restore and
access previous state, the previous state
-serializer's snapshot must be able to be restored.
-
-Flink restores serializer snapshots by first instantiating the
`TypeSerializerSnapshot` with its classname (written
-along with the snapshot bytes). Therefore, to avoid being subject to
unintended classname changes or instantiation
-failures, `TypeSerializerSnapshot` classes should:
-
- - avoid being implemented as anonymous classes or nested classes,
- - have a public, nullary constructor for instantiation
-
-#### 2. Avoid sharing the same `TypeSerializerSnapshot` class across different
serializers
-
-Since schema compatibility checks goes through the serializer snapshots,
having multiple serializers returning
-the same `TypeSerializerSnapshot` class as their snapshot would complicate the
implementation for the
-`TypeSerializerSnapshot#resolveSchemaCompatibility` and
`TypeSerializerSnapshot#restoreSerializer()` method.
-
-This would also be a bad separation of concerns; a single serializer's
serialization schema,
-configuration, as well as how to restore it, should be consolidated in its own
dedicated `TypeSerializerSnapshot` class.
-
-#### 3. Avoid using Java serialization for serializer snapshot content
-
-Java serialization should not be used at all when writing contents of a
persisted serializer snapshot.
-Take for example, a serializer which needs to persist a class of its target
type as part of its snapshot.
-Information about the class should be persisted by writing the class name,
instead of directly serializing the class
-using Java. When reading the snapshot, the class name is read, and used to
dynamically load the class via the name.
-
-This practice ensures that serializer snapshots can always be safely read. In
the above example, if the type class
-was persisted using Java serialization, the snapshot may no longer be readable
once the class implementation has changed
-and is no longer binary compatible according to Java serialization specifics.
-
-## Migrating from deprecated serializer snapshot APIs before Flink 1.7
-
-This section is a guide for API migration from serializers and serializer
snapshots that existed before Flink 1.7.
-
-Before Flink 1.7, serializer snapshots were implemented as a
`TypeSerializerConfigSnapshot` (which is now deprecated,
-and will eventually be removed in the future to be fully replaced by the new
`TypeSerializerSnapshot` interface).
-Moreover, the responsibility of serializer schema compatibility checks lived
within the `TypeSerializer`,
-implemented in the
`TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)` method.
-
-Another major difference between the new and old abstractions is that the
deprecated `TypeSerializerConfigSnapshot`
-did not have the capability of instantiating the previous serializer.
Therefore, in the case where your serializer
-still returns a subclass of `TypeSerializerConfigSnapshot` as its snapshot,
the serializer instance itself will always
-be written to savepoints using Java serialization so that the previous
serializer may be available at restore time.
-This is very undesirable, since whether or not restoring the job will be
successful is susceptible to availability
-of the previous serializer's class, or in general, whether or not the
serializer instance can be read back at restore
-time using Java serialization. This means that you be limited to the same
serializer for your state,
-and could be problematic once you want to upgrade serializer classes or
perform schema migration.
-
-To be future-proof and have flexibility to migrate your state serializers and
schema, it is highly recommended to
-migrate from the old abstractions. The steps to do this is as follows:
-
- 1. Implement a new subclass of `TypeSerializerSnapshot`. This will be the new
snapshot for your serializer.
- 2. Return the new `TypeSerializerSnapshot` as the serializer snapshot for
your serializer in the
- `TypeSerializer#snapshotConfiguration()` method.
- 3. Restore the job from the savepoint that existed before Flink 1.7, and then
take a savepoint again.
- Note that at this step, the old `TypeSerializerConfigSnapshot` of the
serializer must still exist in the classpath,
- and the implementation for the
`TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)` method must
not be
- removed. The purpose of this process is to replace the
`TypeSerializerConfigSnapshot` written in old savepoints
- with the newly implemented `TypeSerializerSnapshot` for the serializer.
- 4. Once you have a savepoint taken with Flink 1.7, the savepoint will contain
`TypeSerializerSnapshot` as the
- state serializer snapshot, and the serializer instance will no longer be
written in the savepoint.
- At this point, it is now safe to remove all implementations of the old
abstraction (remove the old
- `TypeSerializerConfigSnapshot` implementation as will as the
- `TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)` from the
serializer).
+上述代码片段中,有两个重要的事情需要注意。首先,外部序列化器的额外信息会被持久化到快照中,因此当外部序列化器的额外信息被修改后,`getCurrentOuterSnapshotVersion()`
的返回值也需要随之升级。
+
+另外就是,注意我们如何避免使用 Java 的序列化器,仅持久化类名,并在恢复时进行动态加载。避免使用 Java
的序列化器通常是一个很好的做法,有关这一点的详细描述将在下一节介绍。
+
+## 实现细节及最佳实践
+
+#### 1. Flink 通过类名实例化序列化器的快照
+
+序列化器的快照,是如何序列化 state 的唯一事实标准。为了能够恢复并访问之前的 state,之前序列化器的快照也必须进行恢复。
+
+Flink 会使用类名实例化具体的 `TypeSerializerSnapshot`(类名会和快照一起写出)。因此,为了防止无意的类名改动或者初始化
`TypeSerializerSnapshot` 失败,需要保证:
+ - 不能以匿名类或者内部类的形式实现
+ - 有一个无参的 public 构造函数
+
+#### 2. 避免在多个序列化器之间共用一个 `TypeSerializerSnapshot`
+
+由于格式兼容检查是通过序列化器快照进行的,因此多个序列化器返回同一个 `TypeSerializerSnapshot` 会导致
`TypeSerializerSnapshot#resolveSchemaCompatibility` 和
`TypeSerializerSnapshot#restoreSerializer()` 的实现变复杂。
+
+共用 `TypeSerializerSnapshot` 从分工来说也是不好的选择,每个序列化器的的格式,配置信息以及如何恢复等都应该放到一个单独的
`TypeSerializerSnapshot` 中。
+
+#### 3. 避免使用 Java 默认的序列化器
+
+应该避免使用 Java
序列化器序列化序列化器快照。举例来说,序列化器在序列化一个目标类的时候,应该序列化类名,而不是直接序列化该对象。恢复快照时,通过读取类名,然后动态加载生成具体的序列化器快照对象。
+
+这种方式确保了序列化器快照始终可以被安全地读取。在上面的例子中,如果通过 Java
序列化器把类序列化,那么一旦序列化器快照类的实现进行了修改,会由于二进制兼容问题导致无法读取。
+
+## 从 1.7 之前的版本进行迁移
+
+本节将介绍如何从 Flink 1.7 之前的序列化器以及序列化器快照 API 进行迁移。
+
+在 Flink 1.7 之前,序列化器快照是以 `TypeSerializerConfigSnapshot` 存在(现在已经过时,会在未来被
`TypeSerializerSnapshot` 完全取代并删除)。此外,序列化器格式的兼容性检查在
`TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)` 中执行。
+
+新旧实现的另一个主要区别是,`TypeSerializerConfigSnapshot` 无法实例化之前的序列化器。因此,序列化器仍然需要返回一个
`TypeSerializerConfigSnapshot` 子类作为快照,序列化器对象本身会通过 Java 序列化器写到 savepoint
中,这样恢复时可能可以从 savepoint 中恢复原来的序列化器。这是非常不可取的,因为作业是否能够恢复成功,取决于能否从 savepoint
正确恢复原来的序列化器。这限制了序列化器的升级,一旦序列化器继续升级就可能出问题。
+
+为将来序列化器能够能够升级,强烈建议迁移使用新 API,具体操作步骤如下所示:
Review Comment:
```suggestion
为将来序列化器能够继续升级,强烈建议迁移使用新 API,具体操作步骤如下所示:
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]