gaoyunhaii commented on a change in pull request #16655:
URL: https://github.com/apache/flink/pull/16655#discussion_r683329051
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3Serializer.java
##########
@@ -114,8 +115,14 @@ protected void serializeOperatorState(OperatorState
operatorState, DataOutputStr
operatorState.getSubtaskStates();
dos.writeInt(subtaskStateMap.size());
for (Map.Entry<Integer, OperatorSubtaskState> entry :
subtaskStateMap.entrySet()) {
- dos.writeInt(entry.getKey());
- serializeSubtaskState(entry.getValue(), dos);
+ if (entry.getValue().isFinished()) {
+ // We store a negative index for the finished subtask. In
consideration
+ // of the index 0, the negative index would start from -1.
+ dos.writeInt(-(entry.getKey() + 1));
Review comment:
Hi @pnowojski ~ Sorry I might not fully understand the issue, but for
one thing, I think we would only mark subtask as `isFinished = true` in the
checkpoint if it is indeed FINISHED when the checkpoint is taken. This is
because if it has called `operator.finish()` and is waiting for the final
checkpoint, in the following checkpoint (namely `42` ~ `44`), we should still
be able to snapshot its state and thus in this case we would not have the issue
of discarding part of the keyed state~ ?
Then in the checkpoint, we might have
1. If when taking the checkpoint some subtasks of an operator is finished,
we would save a finished flag (in some way) for these subtasks.
2. If all the subtasks of an operator is finished, we would directly save a
finished flag for the whole operator.
Since the FINISHED state of a task won't be reverted, thus in all the
following checkpoints, it would also be marked as FINISHED~?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]