pnowojski commented on code in PR #21131:
URL: https://github.com/apache/flink/pull/21131#discussion_r1012777571
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java:
##########
@@ -73,11 +79,27 @@ public void dispatch(ChannelStateWriteRequest request)
throws Exception {
private void dispatchInternal(ChannelStateWriteRequest request) throws
Exception {
if (request instanceof CheckpointStartRequest) {
checkState(
- !writers.containsKey(request.getCheckpointId()),
- "writer not found for request " + request);
- writers.put(request.getCheckpointId(),
buildWriter((CheckpointStartRequest) request));
+ request.getCheckpointId() > ongoingCheckpointId,
+ String.format(
+ "Checkpoint must be incremented,
ongoingCheckpointId is %s, but the request is %s.",
+ ongoingCheckpointId, request));
+ if (writer != null && !writer.isDone()) {
+ writer.fail(
+ new IllegalStateException(
+ String.format(
+ "Task[name=%s, subtaskIndex=%s] has
uncompleted channelState writer of checkpointId=%s, "
+ + "but it received a new
checkpoint start request of checkpointId=%s, it maybe "
+ + "a bug due to currently not
supported concurrent unaligned checkpoint.",
+ taskName,
+ subtaskIndex,
+ ongoingCheckpointId,
+ request.getCheckpointId())));
+ }
Review Comment:
I was thinking about a contract that `ChannelStateWriter` is allowed to
ignore any request that's not addressed to the `ongoingCheckpointId`. So in
your scenario `abort(44)` could be safely ignored. Once
`SubtaskCheckpointCoordinator` decides to abort/complete checkpoint `43` (for
example it can be due to `notifyCheckpointAborted(43)` rpc, checkpoint `43`
completed, or checkpoint `45` starting (and super-seeding `43`) it would take
care of calling `ChannelStateWriter#abort(43)`.
> Also, maxAbortedCheckpointId is more friendly to channel state file
merging.
Ok, that might be a good argument to keep it as it is.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]