[ 
https://issues.apache.org/jira/browse/BEAM-10223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17142113#comment-17142113
 ] 

Ivan San Jose commented on BEAM-10223:
--------------------------------------

Finally I was able to remove Class references on AvroCoder, but wasn't be able 
to solve an AvroCoder issue inferring schema from generics using AVRO's 
ReflecData, so we've changed our approach to use KryoCoder which supports some 
kind of compatibility on POJO changes, sorry, but I haven't had time to do the 
pull request. Nonetheless, this defect makes Flink checkpoints almost useless 
as Beam application will break as soon as you change any POJO saved in a 
checkpoint.

> AvroCoder has references to the encoded/decoded class
> -----------------------------------------------------
>
>                 Key: BEAM-10223
>                 URL: https://issues.apache.org/jira/browse/BEAM-10223
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-core
>    Affects Versions: 2.22.0
>            Reporter: Ivan San Jose
>            Priority: P2
>
> Hi, may be the JIRA issue title is not so much descriptive, but I couldn't 
> find anything better sorry.
> Let me explain the problem:
> When using Flink as runner, Beam coders are wrapped into Flink's 
> TyperSerializers, and, according to 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java#L95
>  , those coders are java serialized inside the checkpoint.
> The problem is that coders have a reference to the class which is being 
> coded/decoded (Class<T>), so when the coder is serialized, is done along the 
> POJO model, using Java serialization.
> Why this is a problem?
> This is a problem is you are working with checkpoints, because checkpoint 
> restoring will be broken as soon as you change anything in POJO model (even 
> you are using a coder which supports schema evolution and the change is 
> following its evolution rules):
> {code}
> Caused by: java.io.InvalidClassException: 
> internal.model.dimension.POJODimension; local class incompatible: stream 
> classdesc serialVersionUID = -223148029368332375, local class 
> serialVersionUID = 4489864664852536553
>       at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
>       at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1884)
>       at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1750)
>       at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1715)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1555)
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
>       at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:301)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:116)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot.readSnapshot(TypeSerializerConfigSnapshot.java:113)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:174)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76)
>       at 
> org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219)
>       at 
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:119)
>       at 
> org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:83)
>       ... 13 more
> {code}
> So, in order to be able to use existing checkpoints after compatible changes 
> has been done in the Java model, references to Class<T> in Beam Coder should 
> be removed.
> Note that this JIRA ticket is only referencing to AvroCoder, and is the only 
> one fixed into GitHub related pull request.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to