pnowojski commented on code in PR #20233:
URL: https://github.com/apache/flink/pull/20233#discussion_r930700904
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java:
##########
@@ -157,6 +151,14 @@ public void addInputData(
checkpointId,
info,
startSeqNum);
+ if (isCheckpointSubsumedOrAborted(checkpointId)) {
+ LOG.debug(
+ "The checkpoint {} of task {} has been aborted, so don't
addInputData.",
+ checkpointId,
+ taskName);
+ closeBuffers(iterator);
+ return;
+ }
enqueue(write(checkpointId, info, iterator), false);
Review Comment:
What will happen if there is a race condition between this `addInputData`
call from the netty threads, and for example `abort` call?
1. (Netty thread) calling `addInputData`
2. (Netty thread) checking `isCheckpointSubsumedOrAborted`, method returns
false
3. (Netty thread) freezes for a 1ms
4. (Task thread) calls `abort()`
5. (Netty thread) wakes up and finishes `addInputData` method by calling
`enqueue(write(checkpointId, info, iterator), false);`
Will this lead to a resource leak? If not, then why do we need `if
(isCheckpointSubsumedOrAborted(checkpointId))` check here in the first place?
`isCheckpointSubsumedOrAborted` is quite costly due to lock acquisition after
all.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]