[ 
https://issues.apache.org/jira/browse/FLINK-5484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15827903#comment-15827903
 ] 

ASF GitHub Bot commented on FLINK-5484:
---------------------------------------

GitHub user uce opened a pull request:

    https://github.com/apache/flink/pull/3152

    [FLINK-5484] [serialization] Revert Chill version update

    This PR reverts the Twitter Chill dependency update. The version updates 
breaks backwards compatability for savepoints which contain user types that 
were serialized with Kryo, because Chills adds new default serializers that 
change the class IDs.
    
    In Flink 1.1 the default next available class ID was X and registered user 
types got IDs assigned starting at X. In Flink 1.2, the newly added serializers 
in Chill got assigned IDs starting at X before the user types are registered, 
which can lead to user types trying to be deserialized with the wrong 
serializer.
    
    I've verified that this with a savepoint (the one that triggered this 
issue) and furthermore added a test that checks that the default registration 
map does not change between versions. Once we have proper serializer 
versioning, that test will become obsolete.
    
    I would like to merge this to: `release-1.1`, `release-1.2`, and `master`.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/uce/flink 5484-kryo_1.2

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3152.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3152
    
----
commit 55483b71f36b84ac57d03a9b83e0e9d9b9b98eab
Author: Ufuk Celebi <[email protected]>
Date:   2017-01-17T18:10:33Z

    [FLINK-5484] [serialization] Add test for registered Kryo types

commit ebd656310ac9e6323fc7b09632c8aef08f06ba48
Author: Ufuk Celebi <[email protected]>
Date:   2017-01-18T10:27:43Z

    Revert "[FLINK-2608] Updated Twitter Chill version."
    
    This reverts commit 0d3ff88b369fbb1b0a8fb0e8263c9ce0a9da1583.

----


> Kryo serialization changed between 1.1 and 1.2
> ----------------------------------------------
>
>                 Key: FLINK-5484
>                 URL: https://issues.apache.org/jira/browse/FLINK-5484
>             Project: Flink
>          Issue Type: Bug
>          Components: Type Serialization System
>            Reporter: Ufuk Celebi
>
> I think the way that Kryo serializes data changed between 1.1 and 1.2.
> I have a generic Object that is serialized as part of a 1.1 savepoint that I 
> cannot resume from with 1.2:
> {code}
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Job execution failed.
>       at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
>       at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101)
>       at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
>       at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:68)
>       at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1486)
>       at com.dataartisans.DidKryoChange.main(DidKryoChange.java:74)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
>       at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
>       at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339)
>       at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)
>       at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
>       at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)
>       at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
>       at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
>       at 
> org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:422)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
>       at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
>       at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1117)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:900)
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:843)
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:843)
>       at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>       at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>       at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>       at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalStateException: Could not initialize keyed state 
> backend.
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:286)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:199)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:649)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:636)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:654)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: 
> com.esotericsoftware.kryo.KryoException: Unable to find class: f
>       at 
> com.twitter.chill.java.UnmodifiableJavaCollectionSerializer.read(UnmodifiableJavaCollectionSerializer.java:62)
>       at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>       at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
>       at 
> org.apache.flink.migration.runtime.state.memory.AbstractMemStateSnapshot.deserialize(AbstractMemStateSnapshot.java:88)
>       at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restoreHeapState(HeapKeyedStateBackend.java:448)
>       at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restoreOldSavepointKeyedState(HeapKeyedStateBackend.java:406)
>       at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:240)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:784)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:277)
>       ... 6 more
> Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: f
>       at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>       at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>       at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>       at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>       at 
> com.twitter.chill.java.UnmodifiableJavaCollectionSerializer.read(UnmodifiableJavaCollectionSerializer.java:59)
>       ... 14 more
> Caused by: java.lang.ClassNotFoundException: f
>       at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>       at java.lang.Class.forName0(Native Method)
>       at java.lang.Class.forName(Class.java:348)
>       at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
>       ... 18 more
> {code}
> Running the same program with 1.2 and triggering and resuming a savepoint 
> works.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to