[
https://issues.apache.org/jira/browse/FLINK-8421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339236#comment-16339236
]
ASF GitHub Bot commented on FLINK-8421:
---------------------------------------
GitHub user tzulitai opened a pull request:
https://github.com/apache/flink/pull/5362
[FLINK-8421] [DataStream] Make timer serializers reconfigurable on restore
## What is the purpose of the change
Previously, key and namespace serializers of the `HeapInternalTimerService`
were not reconfigured on restore.
In Flink 1.4.0, we removed Avro dependency, and on restore if the Avro
dependency is not present, a `DummyAvroKryoSerializerClass` was registered to
Kryo as a placeholder, which altered the base Kryo registrations in the
`KryoSerializer`. This change required a serializer reconfiguration in order
for restores to work. Effectively, this allowed the issue in the
`HeapInternalTimerService` to surface.
This PR fixes this by writing also the `TypeSerializerConfigSnapshot`s of
the key and namespace serializer of the `HeapInternalTimerService` into
savepoints, and use them to reconfigure new serializers on restore.
Since this would change the binary format of the written timer services,
this PR also uses this opportunity to properly make the format versioned.
More details of the change is explained below.
## Brief change log
- 1bc3cd0: A preliminary migration test that took a savepoint of a
`WindowOperator` with keys that required serialization using the
`KryoSerializer`. Savepoint were taken for Flink versions 1.2 and 1.3.
Restoring from this savepoint in Flink 1.4 fails, and requires the following
commits to pass.
- b9a1695: Always use the `FailureTolerantObjectInputStream` to read
objects in the `InstantiationUtil.deserializeObject(...)` methods. That special
stream avoids restore failures with `ClassNotFoundException` if Avro is not
present, but there were leaks where during the restore process, that special
input stream was not used.
- ff2e6b7 and 8bd955d: Introduced `ByteArrayPrependedInputStream` and
`PostVersionedIOReadableWritable`. These are utility classes that were required
to migrate the serialization format of the timer services from non-versioned to
versioned.
- bcdc1f1: The main change, which adds key / namespace serializer config
snapshots and use them for serializer reconfiguration on restore. This commit
also makes the format versioned.
## Verifying this change
- The migration test added in 1bc3cd0 will not pass without all fixes.
- Unit tests are added for the new `ByteArrayPrependedInputStream` and
`PostVersionedIOReadableWritable` classes.
- The `testSnapshotAndRestore` and `testSnapshotAndRebalancedRestore` tests
in `HeapInternalTimerServiceTest` are adapted to test both versioned and
previous non-versioned formats.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (yes / **no**)
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: (yes / **no**)
- The serializers: (yes / **no** / don't know)
- The runtime per-record code paths (performance sensitive): (yes /
**no** / don't know)
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**YES** / no / don't know)
- The S3 file system connector: (yes / **no** / don't know)
## Documentation
- Does this pull request introduce a new feature? (yes / **no**)
- If yes, how is the feature documented? (**not applicable** / docs /
JavaDocs / not documented)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tzulitai/flink FLINK-8421
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/5362.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #5362
----
commit 1bc3cd0214d2d17f19d76a9aa094429730b5ba13
Author: Tzu-Li (Gordon) Tai <tzulitai@...>
Date: 2018-01-24T16:08:13Z
[FLINK-8421] [DataStream, tests] Add WindowOperator migration test for
Kryo-serialized window keys
commit b9a169535a91d5678ae916d8e54b7e60724a7486
Author: Tzu-Li (Gordon) Tai <tzulitai@...>
Date: 2018-01-24T16:15:08Z
[FLINK-8421] [core] Let InstantiationUtil.deserializeObject() always use
FailureTolerantObjectInputStream
commit ff2e6b75f39f0d474ecca451ac1a47c0183e9a6f
Author: Tzu-Li (Gordon) Tai <tzulitai@...>
Date: 2018-01-24T17:07:14Z
[FLINK-8421] [core] Introduce ByteArrayPrependedInputStream
commit 8bd955d701f9f9278a5e52befea4308f42a60b45
Author: Tzu-Li (Gordon) Tai <tzulitai@...>
Date: 2018-01-24T17:08:52Z
[FLINK-8421] [core] Introduce PostVersionedIOReadableWritable
commit bcdc1f14d29ef272d07c8e52c46a355ac565d853
Author: Tzu-Li (Gordon) Tai <tzulitai@...>
Date: 2018-01-24T17:09:44Z
[FLINK-8421] [DataStream] Make timer serializers reconfigurable on restore
Previously, the key and namespace serializers for the
HeapInternalTimerService were not reconfigured on restore to be compatible
with previously written serializers.
This caused an immediate error to restore savepoints in Flink 1.4.0,
since in Flink 1.4.0 we changed the base registrations in the Kryo
serializer. That change requires serializer reconfiguration.
This commit fixes this by writing also the serializer configuration
snapshots of the key and namespace serializer into savepoints, and use
them to reconfigure the new serializers on rrestore. This improvement also
comes along with making the written data for timer service snapshots
versioned. Backwards compatibility with previous non-versioned formats
is not broken.
----
> HeapInternalTimerService should reconfigure compatible key / namespace
> serializers on restore
> ---------------------------------------------------------------------------------------------
>
> Key: FLINK-8421
> URL: https://issues.apache.org/jira/browse/FLINK-8421
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.4.0, 1.5.0
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Tzu-Li (Gordon) Tai
> Priority: Blocker
> Fix For: 1.3.3, 1.5.0, 1.4.1
>
>
> The {{HeapInternalTimerService}} still uses simple {{equals}} checks on
> restored / newly provided serializers for compatibility checks. This should
> be replaced with the {{TypeSerializer::ensureCompatibility}} checks instead,
> so that new serializers can be reconfigured.
> This would entail that the {{TypeSerializerConfiguration}} of the key and
> namespace serializer in the {{HeapInternalTimerService}} also needs to be
> written to the raw state.
> For Flink 1.4.0 release and current master, this is a critical bug since the
> {{KryoSerializer}} has different default base registrations than before due
> to FLINK-7420. i.e if the key of a window is serialized using the
> {{KryoSerializer}} in 1.3.x, the restore would never succeed in 1.4.0.
> For 1.3.x, this fix would be an improvement, such that the
> {{HeapInternalTimerService}} restore will make use of serializer
> reconfiguration.
> Other remarks:
> * We need to double check all operators that checkpoint / restore from
> **raw** state. Apparently, the serializer compatibility checks were only
> implemented for managed state.
> * Migration ITCases apparently do not have enough coverage. A migration test
> job that uses a key type which required the {{KryoSerializer}}, and uses
> windows, would have caught this issue.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)