fredia commented on code in PR #19907:
URL: https://github.com/apache/flink/pull/19907#discussion_r895925820
##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java:
##########
@@ -141,6 +158,29 @@ public void storeLocalState(
subtaskIndex);
}
+ for (Map.Entry<OperatorID, OperatorSubtaskState> subtaskStateEntry :
+ localState.getSubtaskStateMappings()) {
+ for (KeyedStateHandle keyedStateHandle :
+ subtaskStateEntry.getValue().getManagedKeyedState()) {
+ if (keyedStateHandle instanceof ChangelogStateBackendHandle) {
+ ChangelogStateBackendHandle changelogStateBackendHandle =
+ (ChangelogStateBackendHandle) keyedStateHandle;
+ long materializationID =
changelogStateBackendHandle.getMaterializationID();
+ if (currentMaterializationID == null
+ || materializationID !=
currentMaterializationID.f0) {
+ currentMaterializationID =
Tuple2.of(materializationID, checkpointId);
+ referredByCheckpointID.clear();
Review Comment:
I add `lastCheckpointId` to check whether checkpoint is out of order.
- if yes, don't write to local store, return directly. According to the
materialization of the out-of-order checkpoint, there are two cases:
- if cp with a newer materialization, say, (1)cp1 with m1, (2) cp3 with
m2, (3) cp2 with m1. when confirming cp3, cp1 and m1 would be discarded, and
cp2 would not write to local store. therefore, no additional checkpoints are
kept apart from cp3 and m2.
- if cp without a newer materialization, say, (1)cp1 with m1, (2) cp3
with m1, (3) cp2 with m1. when confirming cp3, m1 would retain, and cp2 would
not write to local store. finally, m1,cp3 are kept.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]