[ 
https://issues.apache.org/jira/browse/FLINK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16711635#comment-16711635
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-11087 at 12/6/18 4:06 PM:
----------------------------------------------------------------------

Confirmed, this is indeed a bug, and the cause is what we've discussed above.
This bug prevents Flink 1.6.x, 1.7.0 from restoring broadcast state from a 
1.5.x savepoint.

Setting this as a blocker for 1.6.4 and 1.7.1, providing a fix for this.
 Thanks for the investigation [~edRojas].


was (Author: tzulitai):
Confirmed, this is indeed a bug, and the cause is what we've discussed above.

Setting this as a blocker for 1.6.4 and 1.7.1, providing a fix for this.
Thanks for the investigation [~edRojas].

> Broadcast state migration Incompatibility from 1.5.3 to 1.7.0
> -------------------------------------------------------------
>
>                 Key: FLINK-11087
>                 URL: https://issues.apache.org/jira/browse/FLINK-11087
>             Project: Flink
>          Issue Type: Bug
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.7.0
>         Environment: Migration from Flink 1.5.3 to Flink 1.7.0
>            Reporter: Edward Rojas
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Major
>              Labels: Migration, State, broadcast
>
> When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast 
> state throws the following error:
> {noformat}
> org.apache.flink.util.StateMigrationException: The new key serializer for 
> broadcast state must not be incompatible.
> at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238)
> at 
> org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:745){noformat}
> The broadcast is using a MapState with StringSerializer as key serializer and 
> a custom JsonSerializer as value serializer. 
> There was no changes in the TypeSerializers used, only upgrade of version. 
>  
> With some debugging I see that at the moment of the validation of the 
> compatibility of states in the DefaultOperatorStateBackend class, the 
> "*registeredBroadcastStates*" containing the data about the 'old' state, 
> contains wrong association of the key and value serializer. This is, 
> JsonSerializer appears as key serializer and StringSerializer appears as 
> value serializer. (when it should be the contrary)
>  
> After more digging, I see that the "OperatorBackendStateMetaInfoReaderV2V3" 
> class is the responsible of this swap here:
> https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to