GitHub user tzulitai opened a pull request:
https://github.com/apache/flink/pull/4103
[FLINK-6883] [scala, state] Fix restore of Scala type states
This PR is based on #4090. Together with #4090 as a whole, this PR fixes
restoring savepoints of Scala type states.
## What this fixes
This PR fixes the problem that the exact same job code written with the
Flink Scala API using Scala types as state, will generate different classnames
for the anonymous classed serializers of case classes / collection types when
compiled against pre-1.3 version and Flink 1.3.
The root cause of this is that prior to this PR in 1.3, the
`TypeSerializer` base class additionally implements the `TypeDeserializer`
interface. This alters Scala compiler's generation order of the anonymous
serializer classes, and therefore ends up in different generated names.
To fix this, the `TypeSerializer` base now no longer implements
`TypeDeserializer`, while not affecting any user-facing interfaces of the
serializer compatibility functionality (i.e. `CompatibilityResult`,
`TypeSerializer#snapshotConfiguration`, `TypeSerializer#ensureCompatibility`
interfaces are not broken).
With this fix, we can at least guarantee that Scala jobs with Scala type
states will be able to be restored across Flink majors versions, when 1) the
same compiler is used, and 2) the user code is remained untouched (invocation
order of the Scala `createTypeInformation` macro remains the same).
## Tests
This PR also adds tests to guard against future problems like this.
Includes:
1. A `ScalaSerializersMigrationTest` to guard against different generated
classnames for anonymous serializers across changes to the codebase. The tested
classnames are what they were in Flink 1.1 and 1.2.
2. A `scala.StatefulJobSavepointITCase` to test end-to-end migration from
1.2.x / 1.3.x for Scala jobs. The 1.2 savepoints were generated under the
`release-1.2` branch.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tzulitai/flink FLINK-6883
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/4103.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #4103
----
commit c9696c20d61ecba26fc19b4a7cdbb16586d30894
Author: Tzu-Li (Gordon) Tai <[email protected]>
Date: 2017-06-08T06:52:04Z
[hotfix] [scala] Fix instantiation of Scala serializers' config snapshot
classes
Prior to this commit, the configuration snapshot classes of Scala
serializers did not have the proper default empty constructor that is
used for deserializing the configuration snapshot.
Since some Scala serializers' config snapshots extend the Java
CompositeTypeSerializerConfigSnapshot, their config snapshot classes are
also changed to be implemented in Java since in Scala we can only call a
single base class constructor from subclasses.
commit 7e20e6251385e04334ec0f06dbaa5f1f0315b530
Author: Tzu-Li (Gordon) Tai <[email protected]>
Date: 2017-06-08T13:29:45Z
[FLINK-6869] [scala] Specify serialVersionUID for all Scala serializers
Previously, Scala serializers did not specify the serialVersionUID, and
therefore prohibited restore from previous Flink version snapshots
because the serializers' implementations changed.
The serialVersionUIDs added in this commit are identical to what they
were (as generated by Java) in Flink 1.2, so that we can at least
restore state that were written with the Scala serializers as of 1.2.
commit bdcf354fa7416f6e1ea1251433b4d97292b219c6
Author: Tzu-Li (Gordon) Tai <[email protected]>
Date: 2017-06-10T20:41:35Z
[FLINK-6869] [core] Tolerate serialVersionUID mismatches for anonymous
classed serializers
This commit lets the TypeSerializerSerializationProxy be tolerable for
serialVersionUID mismatches when reading anonymous classed serializers.
Our Scala case class serializers require this since they use Scala
macros to be generated at compile time, and therefore is not possible to
fix a certain serialVersionUID for them.
This commit also updates the streaming state docs to educate the user to
avoid using anonymous classes for their state serializers.
commit 31a2977c62abed0f985fdc79539b90c4152b60f1
Author: Tzu-Li (Gordon) Tai <[email protected]>
Date: 2017-06-11T09:02:38Z
[hotfix] [cep] Fix incorrect CompatibilityResult.requiresMigration calls in
CEP
commit b294637c59c27ecef58ad67a123d2cfb401f51d2
Author: Tzu-Li (Gordon) Tai <[email protected]>
Date: 2017-06-11T13:30:36Z
[FLINK-6883] [core] Refactor TypeSerializer to not implement
TypeDeserializer
The separation of the TypeDeserializer interface from the TypeSerializer
base class is due to the fact that additionally implementing the
TypeDeserializer interface alters the generation order of anonymos
serializer classes for Scala case classes and collections.
Instead, the TypeDeserializer is now used as a mixin on the
TypeDeserializerAdapter utility, which now serves as a bridge for
both directions (i.e. TypeSerializer to TypeDeserializer, and vice
versa). No user interfaces are broken due to this change.
commit ff0522f6c7f0d8f645736f2769dca90de50179ae
Author: Tzu-Li (Gordon) Tai <[email protected]>
Date: 2017-06-11T13:31:42Z
[FLINK-6883] [tests] Add migration tests for Scala jobs
This commit adds migration ITCases for jobs written using the Scala API.
An extra concern for migration of Scala jobs is that Scala case classes
and collections use anonymous generated serializers, which may affect
state restore.
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---