[ 
https://issues.apache.org/jira/browse/FLINK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16716704#comment-16716704
 ] 

ASF GitHub Bot commented on FLINK-11087:
----------------------------------------

asfgit closed pull request #7256: [FLINK-11087] [state] Incorrect K/V 
serializer association when reading broadcast state from 1.5.x snapshots
URL: https://github.com/apache/flink/pull/7256
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/ops/upgrading.md b/docs/ops/upgrading.md
index 0451940723e..22cea4c53f0 100644
--- a/docs/ops/upgrading.md
+++ b/docs/ops/upgrading.md
@@ -274,7 +274,10 @@ Savepoints are compatible across Flink versions as 
indicated by the table below:
           <td class="text-center">O</td>
           <td class="text-center">O</td>
           <td class="text-center">O</td>
-          <td class="text-left"></td>
+          <td class="text-left">There is a known issue with resuming broadcast 
state created with 1.5.x in versions
+          1.6.x up to 1.6.2, and 1.7.0: <a 
href="https://issues.apache.org/jira/browse/FLINK-11087";>FLINK-11087</a>. Users
+          upgrading to 1.6.x or 1.7.x series need to directly migrate to minor 
versions higher than 1.6.2 and 1.7.0,
+          respectively.</td>
     </tr>
     <tr>
           <td class="text-center"><strong>1.6.x</strong></td>
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java
index 77c267adff1..836edef0aac 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java
@@ -132,11 +132,6 @@ public StateMetaInfoSnapshot readStateMetaInfoSnapshot(
 
                static final OperatorBackendStateMetaInfoReaderV2V3 INSTANCE = 
new OperatorBackendStateMetaInfoReaderV2V3();
 
-               private static final String[] ORDERED_KEY_STRINGS =
-                       new String[]{
-                               
StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER.toString(),
-                               
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()};
-
                @Nonnull
                @Override
                public StateMetaInfoSnapshot readStateMetaInfoSnapshot(
@@ -156,17 +151,25 @@ public StateMetaInfoSnapshot readStateMetaInfoSnapshot(
                        final int listSize = 
stateSerializerAndConfigList.size();
                        StateMetaInfoSnapshot.BackendStateType stateType = 
listSize == 1 ?
                                StateMetaInfoSnapshot.BackendStateType.OPERATOR 
: StateMetaInfoSnapshot.BackendStateType.BROADCAST;
-                       Map<String, TypeSerializerSnapshot<?>> 
serializerConfigsMap = new HashMap<>(listSize);
-                       for (int i = 0; i < listSize; ++i) {
-                               Tuple2<TypeSerializer<?>, 
TypeSerializerSnapshot<?>> serializerAndConf =
-                                       stateSerializerAndConfigList.get(i);
-
-                               // this particular mapping happens to support 
both, V2 and V3
-                               String serializerKey = 
ORDERED_KEY_STRINGS[ORDERED_KEY_STRINGS.length - 1 - i];
 
-                               serializerConfigsMap.put(
-                                       serializerKey,
-                                       serializerAndConf.f1);
+                       Map<String, TypeSerializerSnapshot<?>> 
serializerConfigsMap = new HashMap<>(listSize);
+                       switch (stateType) {
+                               case OPERATOR:
+                                       serializerConfigsMap.put(
+                                               
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString(),
+                                               
stateSerializerAndConfigList.get(0).f1);
+                                       break;
+                               case BROADCAST:
+                                       serializerConfigsMap.put(
+                                               
StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER.toString(),
+                                               
stateSerializerAndConfigList.get(0).f1);
+
+                                       serializerConfigsMap.put(
+                                               
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString(),
+                                               
stateSerializerAndConfigList.get(1).f1);
+                                       break;
+                               default:
+                                       throw new 
IllegalStateException("Unknown operator state type " + stateType);
                        }
 
                        return new StateMetaInfoSnapshot(
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
index f7923620850..3f49d84d595 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
@@ -113,17 +113,17 @@ public void testSavepoint() throws Exception {
                expectedFirstState.put(2L, 2L);
                expectedFirstState.put(3L, 3L);
 
-               final Map<String, String> expectedSecondState = new HashMap<>();
-               expectedSecondState.put("0", "0");
-               expectedSecondState.put("1", "1");
-               expectedSecondState.put("2", "2");
-               expectedSecondState.put("3", "3");
-
-               final Map<String, String> expectedThirdState = new HashMap<>();
-               expectedThirdState.put("0", "0");
-               expectedThirdState.put("1", "1");
-               expectedThirdState.put("2", "2");
-               expectedThirdState.put("3", "3");
+               final Map<String, Long> expectedSecondState = new HashMap<>();
+               expectedSecondState.put("0", 0L);
+               expectedSecondState.put("1", 1L);
+               expectedSecondState.put("2", 2L);
+               expectedSecondState.put("3", 3L);
+
+               final Map<Long, String> expectedThirdState = new HashMap<>();
+               expectedThirdState.put(0L, "0");
+               expectedThirdState.put(1L, "1");
+               expectedThirdState.put(2L, "2");
+               expectedThirdState.put(3L, "3");
 
                if (executionMode == 
StatefulJobSavepointMigrationITCase.ExecutionMode.PERFORM_SAVEPOINT) {
                        nonParallelSource = new 
MigrationTestUtils.CheckpointingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
@@ -171,12 +171,12 @@ public Long getKey(Tuple2<Long, Long> value) throws 
Exception {
                                "broadcast-state-1", 
BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO
                );
 
-               final MapStateDescriptor<String, String> 
secondBroadcastStateDesc = new MapStateDescriptor<>(
-                               "broadcast-state-2", 
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
+               final MapStateDescriptor<String, Long> secondBroadcastStateDesc 
= new MapStateDescriptor<>(
+                               "broadcast-state-2", 
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO
                );
 
-               final MapStateDescriptor<String, String> 
thirdBroadcastStateDesc = new MapStateDescriptor<>(
-                               "broadcast-state-3", 
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
+               final MapStateDescriptor<Long, String> thirdBroadcastStateDesc 
= new MapStateDescriptor<>(
+                               "broadcast-state-3", 
BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
                );
 
                BroadcastStream<Tuple2<Long, Long>> npBroadcastStream = env
@@ -234,7 +234,7 @@ private String getBroadcastSavepointPath(MigrationVersion 
savepointVersion, Stri
 
                private MapStateDescriptor<Long, Long> firstStateDesc;
 
-               private MapStateDescriptor<String, String> secondStateDesc;
+               private MapStateDescriptor<String, Long> secondStateDesc;
 
                @Override
                public void open(Configuration parameters) throws Exception {
@@ -245,7 +245,7 @@ public void open(Configuration parameters) throws Exception 
{
                        );
 
                        secondStateDesc = new MapStateDescriptor<>(
-                                       "broadcast-state-2", 
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
+                                       "broadcast-state-2", 
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO
                        );
                }
 
@@ -257,7 +257,7 @@ public void processElement(Tuple2<Long, Long> value, 
ReadOnlyContext ctx, Collec
                @Override
                public void processBroadcastElement(Tuple2<Long, Long> value, 
Context ctx, Collector<Tuple2<Long, Long>> out) throws Exception {
                        ctx.getBroadcastState(firstStateDesc).put(value.f0, 
value.f1);
-                       
ctx.getBroadcastState(secondStateDesc).put(Long.toString(value.f0), 
Long.toString(value.f1));
+                       
ctx.getBroadcastState(secondStateDesc).put(Long.toString(value.f0), value.f1);
                }
        }
 
@@ -269,14 +269,14 @@ public void processBroadcastElement(Tuple2<Long, Long> 
value, Context ctx, Colle
 
                private static final long serialVersionUID = 
1333992081671604521L;
 
-               private MapStateDescriptor<String, String> stateDesc;
+               private MapStateDescriptor<Long, String> stateDesc;
 
                @Override
                public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
 
                        stateDesc = new MapStateDescriptor<>(
-                                       "broadcast-state-3", 
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
+                                       "broadcast-state-3", 
BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
                        );
                }
 
@@ -287,7 +287,7 @@ public void processElement(Tuple2<Long, Long> value, 
ReadOnlyContext ctx, Collec
 
                @Override
                public void processBroadcastElement(Tuple2<Long, Long> value, 
Context ctx, Collector<Tuple2<Long, Long>> out) throws Exception {
-                       
ctx.getBroadcastState(stateDesc).put(Long.toString(value.f0), 
Long.toString(value.f1));
+                       ctx.getBroadcastState(stateDesc).put(value.f0, 
Long.toString(value.f1));
                }
        }
 
@@ -301,13 +301,13 @@ public void processBroadcastElement(Tuple2<Long, Long> 
value, Context ctx, Colle
 
                private final Map<Long, Long> expectedFirstState;
 
-               private final Map<String, String> expectedSecondState;
+               private final Map<String, Long> expectedSecondState;
 
                private MapStateDescriptor<Long, Long> firstStateDesc;
 
-               private MapStateDescriptor<String, String> secondStateDesc;
+               private MapStateDescriptor<String, Long> secondStateDesc;
 
-               CheckingKeyedBroadcastFunction(Map<Long, Long> firstState, 
Map<String, String> secondState) {
+               CheckingKeyedBroadcastFunction(Map<Long, Long> firstState, 
Map<String, Long> secondState) {
                        this.expectedFirstState = firstState;
                        this.expectedSecondState = secondState;
                }
@@ -321,7 +321,7 @@ public void open(Configuration parameters) throws Exception 
{
                        );
 
                        secondStateDesc = new MapStateDescriptor<>(
-                                       "broadcast-state-2", 
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
+                                       "broadcast-state-2", 
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO
                        );
                }
 
@@ -334,8 +334,8 @@ public void processElement(Tuple2<Long, Long> value, 
ReadOnlyContext ctx, Collec
                        }
                        Assert.assertEquals(expectedFirstState, 
actualFirstState);
 
-                       final Map<String, String> actualSecondState = new 
HashMap<>();
-                       for (Map.Entry<String, String> entry: 
ctx.getBroadcastState(secondStateDesc).immutableEntries()) {
+                       final Map<String, Long> actualSecondState = new 
HashMap<>();
+                       for (Map.Entry<String, Long> entry: 
ctx.getBroadcastState(secondStateDesc).immutableEntries()) {
                                actualSecondState.put(entry.getKey(), 
entry.getValue());
                        }
                        Assert.assertEquals(expectedSecondState, 
actualSecondState);
@@ -357,11 +357,11 @@ public void processBroadcastElement(Tuple2<Long, Long> 
value, Context ctx, Colle
 
                private static final long serialVersionUID = 
1333992081671604521L;
 
-               private final Map<String, String> expectedState;
+               private final Map<Long, String> expectedState;
 
-               private MapStateDescriptor<String, String> stateDesc;
+               private MapStateDescriptor<Long, String> stateDesc;
 
-               CheckingKeyedSingleBroadcastFunction(Map<String, String> state) 
{
+               CheckingKeyedSingleBroadcastFunction(Map<Long, String> state) {
                        this.expectedState = state;
                }
 
@@ -370,14 +370,14 @@ public void open(Configuration parameters) throws 
Exception {
                        super.open(parameters);
 
                        stateDesc = new MapStateDescriptor<>(
-                                       "broadcast-state-3", 
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
+                                       "broadcast-state-3", 
BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
                        );
                }
 
                @Override
                public void processElement(Tuple2<Long, Long> value, 
ReadOnlyContext ctx, Collector<Tuple2<Long, Long>> out) throws Exception {
-                       final Map<String, String> actualState = new HashMap<>();
-                       for (Map.Entry<String, String> entry: 
ctx.getBroadcastState(stateDesc).immutableEntries()) {
+                       final Map<Long, String> actualState = new HashMap<>();
+                       for (Map.Entry<Long, String> entry: 
ctx.getBroadcastState(stateDesc).immutableEntries()) {
                                actualState.put(entry.getKey(), 
entry.getValue());
                        }
                        Assert.assertEquals(expectedState, actualState);
diff --git 
a/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.5-rocksdb-savepoint/_metadata
 
b/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.5-rocksdb-savepoint/_metadata
index aed61836bd9..2e02f1bf87d 100644
Binary files 
a/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.5-rocksdb-savepoint/_metadata
 and 
b/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.5-rocksdb-savepoint/_metadata
 differ
diff --git 
a/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.5-savepoint/_metadata
 
b/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.5-savepoint/_metadata
index 13e8d321036..4ca91d33ccb 100644
Binary files 
a/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.5-savepoint/_metadata
 and 
b/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.5-savepoint/_metadata
 differ
diff --git 
a/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.6-rocksdb-savepoint/_metadata
 
b/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.6-rocksdb-savepoint/_metadata
index 9e2b08654b2..8b2a63ab41b 100644
Binary files 
a/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.6-rocksdb-savepoint/_metadata
 and 
b/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.6-rocksdb-savepoint/_metadata
 differ
diff --git 
a/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.6-savepoint/_metadata
 
b/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.6-savepoint/_metadata
index cceb5bf7f89..cde6d5c44bf 100644
Binary files 
a/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.6-savepoint/_metadata
 and 
b/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.6-savepoint/_metadata
 differ


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Broadcast state migration Incompatibility from 1.5.3 to 1.7.0
> -------------------------------------------------------------
>
>                 Key: FLINK-11087
>                 URL: https://issues.apache.org/jira/browse/FLINK-11087
>             Project: Flink
>          Issue Type: Bug
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.6.2, 1.7.0
>         Environment: Migration from Flink 1.5.3 to Flink 1.7.0
>            Reporter: Edward Rojas
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>              Labels: Migration, State, broadcast, pull-request-available
>             Fix For: 1.6.3, 1.8.0, 1.7.1
>
>
> When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast 
> state throws the following error:
> {noformat}
> org.apache.flink.util.StateMigrationException: The new key serializer for 
> broadcast state must not be incompatible.
> at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238)
> at 
> org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:745){noformat}
> The broadcast is using a MapState with StringSerializer as key serializer and 
> a custom JsonSerializer as value serializer. 
> There was no changes in the TypeSerializers used, only upgrade of version. 
>  
> With some debugging I see that at the moment of the validation of the 
> compatibility of states in the DefaultOperatorStateBackend class, the 
> "*registeredBroadcastStates*" containing the data about the 'old' state, 
> contains wrong association of the key and value serializer. This is, 
> JsonSerializer appears as key serializer and StringSerializer appears as 
> value serializer. (when it should be the contrary)
>  
> After more digging, I see that the "OperatorBackendStateMetaInfoReaderV2V3" 
> class is the responsible of this swap here:
> https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to