pnowojski commented on a change in pull request #7405: [FLINK-11249]
FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
URL: https://github.com/apache/flink/pull/7405#discussion_r245589977
##########
File path:
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
##########
@@ -813,8 +820,18 @@ public void initializeState(FunctionInitializationContext
context) throws Except
semantic = Semantic.NONE;
}
- nextTransactionalIdHintState =
context.getOperatorStateStore().getUnionListState(
+ ListState<NextTransactionalIdHint>
oldNextTransactionalIdHintState =
context.getOperatorStateStore().getUnionListState(
NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
+ nextTransactionalIdHintState =
context.getOperatorStateStore().getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2);
+
+ //migrate and let the new state can be compatible with old state
+ if (oldNextTransactionalIdHintState != null &&
oldNextTransactionalIdHintState.get() != null) {
Review comment:
Is this condition correct? Isn't the result of `getUnionListState` always
not null and instead it returns empty list? Otherwise there would be a bug in:
```
ArrayList<NextTransactionalIdHint> transactionalIdHints =
Lists.newArrayList(nextTransactionalIdHintState.get());
```
Probably this should be changed to check if
`oldNextTransactionalIdHintState.get().isEmpty()`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services