akalash commented on a change in pull request #15959:
URL: https://github.com/apache/flink/pull/15959#discussion_r636802137
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1599,6 +1603,33 @@ private OptionalLong
restoreLatestCheckpointedStateInternal(
}
}
+ private Map<OperatorID, OperatorState>
extractOperatorStates(CompletedCheckpoint checkpoint) {
+ Map<OperatorID, OperatorState> operatorStates =
checkpoint.getOperatorStates();
+
+ if (checkpoint.getCheckpointID() == checkpointIdOfIgnoredInFlightData)
{
+ // rewrite the operator state with empty in-flight data.
+ for (OperatorState operatorState : operatorStates.values()) {
+ for (Map.Entry<Integer, OperatorSubtaskState>
subtaskStateEntry :
+ operatorState.getSubtaskStates().entrySet()) {
+
+ OperatorSubtaskState subtaskState =
subtaskStateEntry.getValue();
+ if (!subtaskState.getResultSubpartitionState().isEmpty()
+ || !subtaskState.getInputChannelState().isEmpty())
{
+ operatorState.putState(
+ subtaskStateEntry.getKey(),
Review comment:
There is nothing to worry about because it modifies the existing keys,
but `ConcurrentModificationException` relates to adding a new or removing one.
But in fact, you were right in your other comment that it doesn't make sense to
modify the checkpoint object. So I will rewrite the code to create a new map of
states rather than change the existing one.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]