zhijiangW commented on a change in pull request #11687:
URL: https://github.com/apache/flink/pull/11687#discussion_r422029939
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
##########
@@ -177,17 +178,18 @@ void sendTaskEvent(TaskEvent event) {
@Override
boolean isReleased() {
- return isReleased;
+ return isReleased.get();
}
void releaseAllResources() throws IOException {
- ArrayDeque<Buffer> releasedBuffers = new ArrayDeque<>();
- synchronized (receivedBuffers) {
- releasedBuffers.addAll(receivedBuffers);
- receivedBuffers.clear();
- isReleased = true;
+ if (isReleased.compareAndSet(false, true)) {
Review comment:
In `RemoteInputChannel` we also had the atomic released variable and I
guess here is the similar case. The release operation might be called not only
by task thread, but also by canceler thread. If the latter, it might has
visibility issue for task thread while interacting with
`RecoveredInputChannel#isReleased` in `BufferManager` related operations.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]