GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/4103

    [FLINK-6883] [scala, state] Fix restore of Scala type states

    This PR is based on #4090. Together with #4090 as a whole, this PR fixes 
restoring savepoints of Scala type states.
    
    ## What this fixes
    
    This PR fixes the problem that the exact same job code written with the 
Flink Scala API using Scala types as state, will generate different classnames 
for the anonymous classed serializers of case classes / collection types when 
compiled against pre-1.3 version and Flink 1.3.
    
    The root cause of this is that prior to this PR in 1.3, the 
`TypeSerializer` base class additionally implements the `TypeDeserializer` 
interface. This alters Scala compiler's generation order of the anonymous 
serializer classes, and therefore ends up in different generated names.
    
    To fix this, the `TypeSerializer` base now no longer implements 
`TypeDeserializer`, while not affecting any user-facing interfaces of the 
serializer compatibility functionality (i.e. `CompatibilityResult`, 
`TypeSerializer#snapshotConfiguration`, `TypeSerializer#ensureCompatibility` 
interfaces are not broken).
    
    With this fix, we can at least guarantee that Scala jobs with Scala type 
states will be able to be restored across Flink majors versions, when 1) the 
same compiler is used, and 2) the user code is remained untouched (invocation 
order of the Scala `createTypeInformation` macro remains the same).
    
    ## Tests
    
    This PR also adds tests to guard against future problems like this. 
Includes:
    1. A `ScalaSerializersMigrationTest` to guard against different generated 
classnames for anonymous serializers across changes to the codebase. The tested 
classnames are what they were in Flink 1.1 and 1.2.
    2. A `scala.StatefulJobSavepointITCase` to test end-to-end migration from 
1.2.x / 1.3.x for Scala jobs. The 1.2 savepoints were generated under the 
`release-1.2` branch.
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-6883

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4103.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4103
    
----
commit c9696c20d61ecba26fc19b4a7cdbb16586d30894
Author: Tzu-Li (Gordon) Tai <[email protected]>
Date:   2017-06-08T06:52:04Z

    [hotfix] [scala] Fix instantiation of Scala serializers' config snapshot 
classes
    
    Prior to this commit, the configuration snapshot classes of Scala
    serializers did not have the proper default empty constructor that is
    used for deserializing the configuration snapshot.
    
    Since some Scala serializers' config snapshots extend the Java
    CompositeTypeSerializerConfigSnapshot, their config snapshot classes are
    also changed to be implemented in Java since in Scala we can only call a
    single base class constructor from subclasses.

commit 7e20e6251385e04334ec0f06dbaa5f1f0315b530
Author: Tzu-Li (Gordon) Tai <[email protected]>
Date:   2017-06-08T13:29:45Z

    [FLINK-6869] [scala] Specify serialVersionUID for all Scala serializers
    
    Previously, Scala serializers did not specify the serialVersionUID, and
    therefore prohibited restore from previous Flink version snapshots
    because the serializers' implementations changed.
    
    The serialVersionUIDs added in this commit are identical to what they
    were (as generated by Java) in Flink 1.2, so that we can at least
    restore state that were written with the Scala serializers as of 1.2.

commit bdcf354fa7416f6e1ea1251433b4d97292b219c6
Author: Tzu-Li (Gordon) Tai <[email protected]>
Date:   2017-06-10T20:41:35Z

    [FLINK-6869] [core] Tolerate serialVersionUID mismatches for anonymous 
classed serializers
    
    This commit lets the TypeSerializerSerializationProxy be tolerable for
    serialVersionUID mismatches when reading anonymous classed serializers.
    
    Our Scala case class serializers require this since they use Scala
    macros to be generated at compile time, and therefore is not possible to
    fix a certain serialVersionUID for them.
    
    This commit also updates the streaming state docs to educate the user to
    avoid using anonymous classes for their state serializers.

commit 31a2977c62abed0f985fdc79539b90c4152b60f1
Author: Tzu-Li (Gordon) Tai <[email protected]>
Date:   2017-06-11T09:02:38Z

    [hotfix] [cep] Fix incorrect CompatibilityResult.requiresMigration calls in 
CEP

commit b294637c59c27ecef58ad67a123d2cfb401f51d2
Author: Tzu-Li (Gordon) Tai <[email protected]>
Date:   2017-06-11T13:30:36Z

    [FLINK-6883] [core] Refactor TypeSerializer to not implement 
TypeDeserializer
    
    The separation of the TypeDeserializer interface from the TypeSerializer
    base class is due to the fact that additionally implementing the
    TypeDeserializer interface alters the generation order of anonymos
    serializer classes for Scala case classes and collections.
    
    Instead, the TypeDeserializer is now used as a mixin on the
    TypeDeserializerAdapter utility, which now serves as a bridge for
    both directions (i.e. TypeSerializer to TypeDeserializer, and vice
    versa). No user interfaces are broken due to this change.

commit ff0522f6c7f0d8f645736f2769dca90de50179ae
Author: Tzu-Li (Gordon) Tai <[email protected]>
Date:   2017-06-11T13:31:42Z

    [FLINK-6883] [tests] Add migration tests for Scala jobs
    
    This commit adds migration ITCases for jobs written using the Scala API.
    An extra concern for migration of Scala jobs is that Scala case classes
    and collections use anonymous generated serializers, which may affect
    state restore.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to