[ https://issues.apache.org/jira/browse/FLINK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16716704#comment-16716704 ]
ASF GitHub Bot commented on FLINK-11087: ---------------------------------------- asfgit closed pull request #7256: [FLINK-11087] [state] Incorrect K/V serializer association when reading broadcast state from 1.5.x snapshots URL: https://github.com/apache/flink/pull/7256 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/ops/upgrading.md b/docs/ops/upgrading.md index 0451940723e..22cea4c53f0 100644 --- a/docs/ops/upgrading.md +++ b/docs/ops/upgrading.md @@ -274,7 +274,10 @@ Savepoints are compatible across Flink versions as indicated by the table below: <td class="text-center">O</td> <td class="text-center">O</td> <td class="text-center">O</td> - <td class="text-left"></td> + <td class="text-left">There is a known issue with resuming broadcast state created with 1.5.x in versions + 1.6.x up to 1.6.2, and 1.7.0: <a href="https://issues.apache.org/jira/browse/FLINK-11087">FLINK-11087</a>. Users + upgrading to 1.6.x or 1.7.x series need to directly migrate to minor versions higher than 1.6.2 and 1.7.0, + respectively.</td> </tr> <tr> <td class="text-center"><strong>1.6.x</strong></td> diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java index 77c267adff1..836edef0aac 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java @@ -132,11 +132,6 @@ public StateMetaInfoSnapshot readStateMetaInfoSnapshot( static final OperatorBackendStateMetaInfoReaderV2V3 INSTANCE = new OperatorBackendStateMetaInfoReaderV2V3(); - private static final String[] ORDERED_KEY_STRINGS = - new String[]{ - StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER.toString(), - StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()}; - @Nonnull @Override public StateMetaInfoSnapshot readStateMetaInfoSnapshot( @@ -156,17 +151,25 @@ public StateMetaInfoSnapshot readStateMetaInfoSnapshot( final int listSize = stateSerializerAndConfigList.size(); StateMetaInfoSnapshot.BackendStateType stateType = listSize == 1 ? StateMetaInfoSnapshot.BackendStateType.OPERATOR : StateMetaInfoSnapshot.BackendStateType.BROADCAST; - Map<String, TypeSerializerSnapshot<?>> serializerConfigsMap = new HashMap<>(listSize); - for (int i = 0; i < listSize; ++i) { - Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>> serializerAndConf = - stateSerializerAndConfigList.get(i); - - // this particular mapping happens to support both, V2 and V3 - String serializerKey = ORDERED_KEY_STRINGS[ORDERED_KEY_STRINGS.length - 1 - i]; - serializerConfigsMap.put( - serializerKey, - serializerAndConf.f1); + Map<String, TypeSerializerSnapshot<?>> serializerConfigsMap = new HashMap<>(listSize); + switch (stateType) { + case OPERATOR: + serializerConfigsMap.put( + StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString(), + stateSerializerAndConfigList.get(0).f1); + break; + case BROADCAST: + serializerConfigsMap.put( + StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER.toString(), + stateSerializerAndConfigList.get(0).f1); + + serializerConfigsMap.put( + StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString(), + stateSerializerAndConfigList.get(1).f1); + break; + default: + throw new IllegalStateException("Unknown operator state type " + stateType); } return new StateMetaInfoSnapshot( diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java index f7923620850..3f49d84d595 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java @@ -113,17 +113,17 @@ public void testSavepoint() throws Exception { expectedFirstState.put(2L, 2L); expectedFirstState.put(3L, 3L); - final Map<String, String> expectedSecondState = new HashMap<>(); - expectedSecondState.put("0", "0"); - expectedSecondState.put("1", "1"); - expectedSecondState.put("2", "2"); - expectedSecondState.put("3", "3"); - - final Map<String, String> expectedThirdState = new HashMap<>(); - expectedThirdState.put("0", "0"); - expectedThirdState.put("1", "1"); - expectedThirdState.put("2", "2"); - expectedThirdState.put("3", "3"); + final Map<String, Long> expectedSecondState = new HashMap<>(); + expectedSecondState.put("0", 0L); + expectedSecondState.put("1", 1L); + expectedSecondState.put("2", 2L); + expectedSecondState.put("3", 3L); + + final Map<Long, String> expectedThirdState = new HashMap<>(); + expectedThirdState.put(0L, "0"); + expectedThirdState.put(1L, "1"); + expectedThirdState.put(2L, "2"); + expectedThirdState.put(3L, "3"); if (executionMode == StatefulJobSavepointMigrationITCase.ExecutionMode.PERFORM_SAVEPOINT) { nonParallelSource = new MigrationTestUtils.CheckpointingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS); @@ -171,12 +171,12 @@ public Long getKey(Tuple2<Long, Long> value) throws Exception { "broadcast-state-1", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO ); - final MapStateDescriptor<String, String> secondBroadcastStateDesc = new MapStateDescriptor<>( - "broadcast-state-2", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO + final MapStateDescriptor<String, Long> secondBroadcastStateDesc = new MapStateDescriptor<>( + "broadcast-state-2", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO ); - final MapStateDescriptor<String, String> thirdBroadcastStateDesc = new MapStateDescriptor<>( - "broadcast-state-3", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO + final MapStateDescriptor<Long, String> thirdBroadcastStateDesc = new MapStateDescriptor<>( + "broadcast-state-3", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO ); BroadcastStream<Tuple2<Long, Long>> npBroadcastStream = env @@ -234,7 +234,7 @@ private String getBroadcastSavepointPath(MigrationVersion savepointVersion, Stri private MapStateDescriptor<Long, Long> firstStateDesc; - private MapStateDescriptor<String, String> secondStateDesc; + private MapStateDescriptor<String, Long> secondStateDesc; @Override public void open(Configuration parameters) throws Exception { @@ -245,7 +245,7 @@ public void open(Configuration parameters) throws Exception { ); secondStateDesc = new MapStateDescriptor<>( - "broadcast-state-2", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO + "broadcast-state-2", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO ); } @@ -257,7 +257,7 @@ public void processElement(Tuple2<Long, Long> value, ReadOnlyContext ctx, Collec @Override public void processBroadcastElement(Tuple2<Long, Long> value, Context ctx, Collector<Tuple2<Long, Long>> out) throws Exception { ctx.getBroadcastState(firstStateDesc).put(value.f0, value.f1); - ctx.getBroadcastState(secondStateDesc).put(Long.toString(value.f0), Long.toString(value.f1)); + ctx.getBroadcastState(secondStateDesc).put(Long.toString(value.f0), value.f1); } } @@ -269,14 +269,14 @@ public void processBroadcastElement(Tuple2<Long, Long> value, Context ctx, Colle private static final long serialVersionUID = 1333992081671604521L; - private MapStateDescriptor<String, String> stateDesc; + private MapStateDescriptor<Long, String> stateDesc; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); stateDesc = new MapStateDescriptor<>( - "broadcast-state-3", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO + "broadcast-state-3", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO ); } @@ -287,7 +287,7 @@ public void processElement(Tuple2<Long, Long> value, ReadOnlyContext ctx, Collec @Override public void processBroadcastElement(Tuple2<Long, Long> value, Context ctx, Collector<Tuple2<Long, Long>> out) throws Exception { - ctx.getBroadcastState(stateDesc).put(Long.toString(value.f0), Long.toString(value.f1)); + ctx.getBroadcastState(stateDesc).put(value.f0, Long.toString(value.f1)); } } @@ -301,13 +301,13 @@ public void processBroadcastElement(Tuple2<Long, Long> value, Context ctx, Colle private final Map<Long, Long> expectedFirstState; - private final Map<String, String> expectedSecondState; + private final Map<String, Long> expectedSecondState; private MapStateDescriptor<Long, Long> firstStateDesc; - private MapStateDescriptor<String, String> secondStateDesc; + private MapStateDescriptor<String, Long> secondStateDesc; - CheckingKeyedBroadcastFunction(Map<Long, Long> firstState, Map<String, String> secondState) { + CheckingKeyedBroadcastFunction(Map<Long, Long> firstState, Map<String, Long> secondState) { this.expectedFirstState = firstState; this.expectedSecondState = secondState; } @@ -321,7 +321,7 @@ public void open(Configuration parameters) throws Exception { ); secondStateDesc = new MapStateDescriptor<>( - "broadcast-state-2", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO + "broadcast-state-2", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO ); } @@ -334,8 +334,8 @@ public void processElement(Tuple2<Long, Long> value, ReadOnlyContext ctx, Collec } Assert.assertEquals(expectedFirstState, actualFirstState); - final Map<String, String> actualSecondState = new HashMap<>(); - for (Map.Entry<String, String> entry: ctx.getBroadcastState(secondStateDesc).immutableEntries()) { + final Map<String, Long> actualSecondState = new HashMap<>(); + for (Map.Entry<String, Long> entry: ctx.getBroadcastState(secondStateDesc).immutableEntries()) { actualSecondState.put(entry.getKey(), entry.getValue()); } Assert.assertEquals(expectedSecondState, actualSecondState); @@ -357,11 +357,11 @@ public void processBroadcastElement(Tuple2<Long, Long> value, Context ctx, Colle private static final long serialVersionUID = 1333992081671604521L; - private final Map<String, String> expectedState; + private final Map<Long, String> expectedState; - private MapStateDescriptor<String, String> stateDesc; + private MapStateDescriptor<Long, String> stateDesc; - CheckingKeyedSingleBroadcastFunction(Map<String, String> state) { + CheckingKeyedSingleBroadcastFunction(Map<Long, String> state) { this.expectedState = state; } @@ -370,14 +370,14 @@ public void open(Configuration parameters) throws Exception { super.open(parameters); stateDesc = new MapStateDescriptor<>( - "broadcast-state-3", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO + "broadcast-state-3", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO ); } @Override public void processElement(Tuple2<Long, Long> value, ReadOnlyContext ctx, Collector<Tuple2<Long, Long>> out) throws Exception { - final Map<String, String> actualState = new HashMap<>(); - for (Map.Entry<String, String> entry: ctx.getBroadcastState(stateDesc).immutableEntries()) { + final Map<Long, String> actualState = new HashMap<>(); + for (Map.Entry<Long, String> entry: ctx.getBroadcastState(stateDesc).immutableEntries()) { actualState.put(entry.getKey(), entry.getValue()); } Assert.assertEquals(expectedState, actualState); diff --git a/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.5-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.5-rocksdb-savepoint/_metadata index aed61836bd9..2e02f1bf87d 100644 Binary files a/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.5-rocksdb-savepoint/_metadata and b/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.5-rocksdb-savepoint/_metadata differ diff --git a/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.5-savepoint/_metadata b/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.5-savepoint/_metadata index 13e8d321036..4ca91d33ccb 100644 Binary files a/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.5-savepoint/_metadata and b/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.5-savepoint/_metadata differ diff --git a/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.6-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.6-rocksdb-savepoint/_metadata index 9e2b08654b2..8b2a63ab41b 100644 Binary files a/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.6-rocksdb-savepoint/_metadata and b/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.6-rocksdb-savepoint/_metadata differ diff --git a/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.6-savepoint/_metadata b/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.6-savepoint/_metadata index cceb5bf7f89..cde6d5c44bf 100644 Binary files a/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.6-savepoint/_metadata and b/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.6-savepoint/_metadata differ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Broadcast state migration Incompatibility from 1.5.3 to 1.7.0 > ------------------------------------------------------------- > > Key: FLINK-11087 > URL: https://issues.apache.org/jira/browse/FLINK-11087 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing > Affects Versions: 1.6.2, 1.7.0 > Environment: Migration from Flink 1.5.3 to Flink 1.7.0 > Reporter: Edward Rojas > Assignee: Tzu-Li (Gordon) Tai > Priority: Blocker > Labels: Migration, State, broadcast, pull-request-available > Fix For: 1.6.3, 1.8.0, 1.7.1 > > > When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast > state throws the following error: > {noformat} > org.apache.flink.util.StateMigrationException: The new key serializer for > broadcast state must not be incompatible. > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238) > at > org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:745){noformat} > The broadcast is using a MapState with StringSerializer as key serializer and > a custom JsonSerializer as value serializer. > There was no changes in the TypeSerializers used, only upgrade of version. > > With some debugging I see that at the moment of the validation of the > compatibility of states in the DefaultOperatorStateBackend class, the > "*registeredBroadcastStates*" containing the data about the 'old' state, > contains wrong association of the key and value serializer. This is, > JsonSerializer appears as key serializer and StringSerializer appears as > value serializer. (when it should be the contrary) > > After more digging, I see that the "OperatorBackendStateMetaInfoReaderV2V3" > class is the responsible of this swap here: > https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165 -- This message was sent by Atlassian JIRA (v7.6.3#76005)