[ 
https://issues.apache.org/jira/browse/FLINK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16711495#comment-16711495
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-11087:
---------------------------------------------

I checked the key / value serializer association in the 
{{DefaultOperatorStateBackend}}; it seems to be correct.

The problem, as [~edRojas] has indicated, is when restoring from 1.5 
(regardless of whether the restore happens in 1.6 or 1.7), the serializer keys 
( {{KEY_SERIALIZER}} and {{VALUE_SERIALIZER}} ) in the state meta info had 
swapped positions.
Because of this line, 
[https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165],
for broadcast state meta info the first read serializer (which is the key 
serializer) is assigned the {{VALUE_SERIALIZER}} key, while the second read 
serializer (which is the value serializer) is assigned the {{KEY_SERIALIZER}} 
key.

This looks like a bug that affects both Flink 1.6.x and Flink 1.7.0.

[~stefanrichte...@gmail.com] could you confirm this?

> Broadcast state migration Incompatibility from 1.5.3 to 1.7.0
> -------------------------------------------------------------
>
>                 Key: FLINK-11087
>                 URL: https://issues.apache.org/jira/browse/FLINK-11087
>             Project: Flink
>          Issue Type: Bug
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.7.0
>         Environment: Migration from Flink 1.5.3 to Flink 1.7.0
>            Reporter: Edward Rojas
>            Priority: Major
>              Labels: Migration, State, broadcast
>
> When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast 
> state throws the following error:
> {noformat}
> org.apache.flink.util.StateMigrationException: The new key serializer for 
> broadcast state must not be incompatible.
> at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238)
> at 
> org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:745){noformat}
> The broadcast is using a MapState with StringSerializer as key serializer and 
> a custom JsonSerializer as value serializer. 
> There was no changes in the TypeSerializers used, only upgrade of version. 
>  
> With some debugging I see that at the moment of the validation of the 
> compatibility of states in the DefaultOperatorStateBackend class, the 
> "*registeredBroadcastStates*" containing the data about the 'old' state, 
> contains wrong association of the key and value serializer. This is, 
> JsonSerializer appears as key serializer and StringSerializer appears as 
> value serializer. (when it should be the contrary)
>  
> After more digging, I see that the "OperatorBackendStateMetaInfoReaderV2V3" 
> class is the responsible of this swap here:
> https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to