[
https://issues.apache.org/jira/browse/BEAM-10223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17142804#comment-17142804
]
Maximilian Michels commented on BEAM-10223:
-------------------------------------------
I think what you want to do is to keep the previous data classes around. We can
then perform a migration from the old data format to the new one. If you alter
the classes directly, this won't work because we can't load the old classes via
java serialization. For this to work, we need to alter the current restore
behavior which assumes the old serializer is compatible. Instead, we will have
to perform a migration from the old to the new serializer. The only
prerequisite is that you keep your old class dependencies around.
> AvroCoder has references to the encoded/decoded class
> -----------------------------------------------------
>
> Key: BEAM-10223
> URL: https://issues.apache.org/jira/browse/BEAM-10223
> Project: Beam
> Issue Type: Bug
> Components: sdk-java-core
> Affects Versions: 2.22.0
> Reporter: Ivan San Jose
> Priority: P2
>
> Hi, may be the JIRA issue title is not so much descriptive, but I couldn't
> find anything better sorry.
> Let me explain the problem:
> When using Flink as runner, Beam coders are wrapped into Flink's
> TyperSerializers, and, according to
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java#L95
> , those coders are java serialized inside the checkpoint.
> The problem is that coders have a reference to the class which is being
> coded/decoded (Class<T>), so when the coder is serialized, is done along the
> POJO model, using Java serialization.
> Why this is a problem?
> This is a problem is you are working with checkpoints, because checkpoint
> restoring will be broken as soon as you change anything in POJO model (even
> you are using a coder which supports schema evolution and the change is
> following its evolution rules):
> {code}
> Caused by: java.io.InvalidClassException:
> internal.model.dimension.POJODimension; local class incompatible: stream
> classdesc serialVersionUID = -223148029368332375, local class
> serialVersionUID = 4489864664852536553
> at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
> at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1884)
> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1750)
> at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1715)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1555)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:301)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:116)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot.readSnapshot(TypeSerializerConfigSnapshot.java:113)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:174)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76)
> at
> org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219)
> at
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:119)
> at
> org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:83)
> ... 13 more
> {code}
> So, in order to be able to use existing checkpoints after compatible changes
> has been done in the Java model, references to Class<T> in Beam Coder should
> be removed.
> Note that this JIRA ticket is only referencing to AvroCoder, and is the only
> one fixed into GitHub related pull request.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)