[ https://issues.apache.org/jira/browse/FLINK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16711495#comment-16711495 ]
Tzu-Li (Gordon) Tai commented on FLINK-11087: --------------------------------------------- I checked the key / value serializer association in the {{DefaultOperatorStateBackend}}; it seems to be correct. The problem, as [~edRojas] has indicated, is when restoring from 1.5 (regardless of whether the restore happens in 1.6 or 1.7), the serializer keys ( {{KEY_SERIALIZER}} and {{VALUE_SERIALIZER}} ) in the state meta info had swapped positions. Because of this line, [https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165], for broadcast state meta info the first read serializer (which is the key serializer) is assigned the {{VALUE_SERIALIZER}} key, while the second read serializer (which is the value serializer) is assigned the {{KEY_SERIALIZER}} key. This looks like a bug that affects both Flink 1.6.x and Flink 1.7.0. [~stefanrichte...@gmail.com] could you confirm this? > Broadcast state migration Incompatibility from 1.5.3 to 1.7.0 > ------------------------------------------------------------- > > Key: FLINK-11087 > URL: https://issues.apache.org/jira/browse/FLINK-11087 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing > Affects Versions: 1.7.0 > Environment: Migration from Flink 1.5.3 to Flink 1.7.0 > Reporter: Edward Rojas > Priority: Major > Labels: Migration, State, broadcast > > When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast > state throws the following error: > {noformat} > org.apache.flink.util.StateMigrationException: The new key serializer for > broadcast state must not be incompatible. > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238) > at > org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:745){noformat} > The broadcast is using a MapState with StringSerializer as key serializer and > a custom JsonSerializer as value serializer. > There was no changes in the TypeSerializers used, only upgrade of version. > > With some debugging I see that at the moment of the validation of the > compatibility of states in the DefaultOperatorStateBackend class, the > "*registeredBroadcastStates*" containing the data about the 'old' state, > contains wrong association of the key and value serializer. This is, > JsonSerializer appears as key serializer and StringSerializer appears as > value serializer. (when it should be the contrary) > > After more digging, I see that the "OperatorBackendStateMetaInfoReaderV2V3" > class is the responsible of this swap here: > https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165 -- This message was sent by Atlassian JIRA (v7.6.3#76005)