pnowojski commented on code in PR #21131:
URL: https://github.com/apache/flink/pull/21131#discussion_r1012016085
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java:
##########
@@ -73,11 +79,27 @@ public void dispatch(ChannelStateWriteRequest request)
throws Exception {
private void dispatchInternal(ChannelStateWriteRequest request) throws
Exception {
if (request instanceof CheckpointStartRequest) {
checkState(
- !writers.containsKey(request.getCheckpointId()),
- "writer not found for request " + request);
- writers.put(request.getCheckpointId(),
buildWriter((CheckpointStartRequest) request));
+ request.getCheckpointId() > ongoingCheckpointId,
+ String.format(
+ "Checkpoint must be incremented,
ongoingCheckpointId is %s, but the request is %s.",
+ ongoingCheckpointId, request));
+ if (writer != null && !writer.isDone()) {
+ writer.fail(
+ new IllegalStateException(
+ String.format(
+ "Task[name=%s, subtaskIndex=%s] has
uncompleted channelState writer of checkpointId=%s, "
+ + "but it received a new
checkpoint start request of checkpointId=%s, it maybe "
+ + "a bug due to currently not
supported concurrent unaligned checkpoint.",
+ taskName,
+ subtaskIndex,
+ ongoingCheckpointId,
+ request.getCheckpointId())));
+ }
Review Comment:
Hmmm, that's a bit confusing. Indeed it looks like this might be working.
I've missed the importance of this `writer.isDone()` check. It's a bit strange
that for detecting if the writer is actually a valid writer to use or not,
sometimes internal fields like `writer != null` and `ongoingCheckpointId ==
request.getCheckpointId()`, but other times this decision is made by checking a
state of another class like `writer.isDone()`?
> BTW, writer cannot be set to null during abort checkpoint. There may be
some writeInput/writeOutput after abort. If set to null, req.onWriterMissing();
will throw exception.
Wouldn't it be more explicit, if we:
1. kept here only the `writer != null` check
2. Maybe in `ChannelStateWriteRequest#abort` replace
`CheckpointInProgressRequest` (this is confusing on its own) with a dedicated
`CheckpointAbortRequest`
3. When handling/dispatching `CheckpointAbortRequest` here, we would set the
`this.writer = null` and set something like `this.abortedCheckpointId =
this.ongoingCheckpointId`
4. when trying to access the writer, you could then check
```
ChannelStateCheckpointWriter writer =
ongoingCheckpointId == request.getCheckpointId() &&
abortedCheckpointId != request.getCheckpointId() ? this.writer : null;
```
?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java:
##########
@@ -98,18 +120,26 @@ private ChannelStateCheckpointWriter
buildWriter(CheckpointStartRequest request)
streamFactoryResolver.resolveCheckpointStorageLocation(
request.getCheckpointId(),
request.getLocationReference()),
serializer,
- () -> writers.remove(request.getCheckpointId()));
+ () -> {
+ checkState(
+ request.getCheckpointId() == ongoingCheckpointId,
+ "The ongoingCheckpointId[%s] was changed when
clear writer of checkpoint[%s], it might be a bug.",
+ ongoingCheckpointId,
+ request.getCheckpointId());
+ this.writer = null;
+ });
Review Comment:
> I don't think so. Actually, all abort, start, writeInput, finishInput,
writeOutput or finishOutput are executed in the Channel state writer Executor
Thread. It is single thread, so the race condition isn't exist.
I was thinking about something else, but I see now that I was wrong anyway.
The abort call would not be executed because of the returned writer from
```
ChannelStateCheckpointWriter writer =
ongoingCheckpointId == request.getCheckpointId() ?
this.writer : null;
```
would be `null` and `abort` request itself has `ignoreMissingWriter` set to
`true`.
If checkpoint 42 is completed due to receiving of a `CheckpointBarrier`,
`ChannelStateWriter#finishInput` would actually work fine, and it would
actually successfully execute this `checkState`.
Long story short, as long as in the newly proposed version,
`ongoingCheckpointId` is set correctly, all of the writes, input/output
completions, aborts will be correctly handled - either ignored if they refer to
not the ongoing checkpoint, or actually correctly processed?
> Map<Long, ChannelStateCheckpointWriter> writers; cannot be cleaned up
until the end of the Task.
👍 Yeah, I've noticed.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java:
##########
@@ -73,11 +79,27 @@ public void dispatch(ChannelStateWriteRequest request)
throws Exception {
private void dispatchInternal(ChannelStateWriteRequest request) throws
Exception {
if (request instanceof CheckpointStartRequest) {
checkState(
- !writers.containsKey(request.getCheckpointId()),
- "writer not found for request " + request);
- writers.put(request.getCheckpointId(),
buildWriter((CheckpointStartRequest) request));
+ request.getCheckpointId() > ongoingCheckpointId,
+ String.format(
+ "Checkpoint must be incremented,
ongoingCheckpointId is %s, but the request is %s.",
+ ongoingCheckpointId, request));
+ if (writer != null && !writer.isDone()) {
+ writer.fail(
+ new IllegalStateException(
+ String.format(
+ "Task[name=%s, subtaskIndex=%s] has
uncompleted channelState writer of checkpointId=%s, "
+ + "but it received a new
checkpoint start request of checkpointId=%s, it maybe "
+ + "a bug due to currently not
supported concurrent unaligned checkpoint.",
+ taskName,
+ subtaskIndex,
+ ongoingCheckpointId,
+ request.getCheckpointId())));
+ }
Review Comment:
> I have added the
ChannelStateWriterImplTest.testAbortOldAndStartNewCheckpoint(). It is:
I'm not sure if that's the correct level to test it. Because it's actually
important how `SubtaskCheckpointCoordinatorImpl` is using the
`ChannelStateWriter`. Can we implement this test using
`SubtaskCheckpointCoordinatorImpl` (in `SubtaskCheckpointCoordinatorTest`)?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]