zhijiangW commented on a change in pull request #11687:
URL: https://github.com/apache/flink/pull/11687#discussion_r422055218
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
##########
@@ -177,17 +178,18 @@ void sendTaskEvent(TaskEvent event) {
@Override
boolean isReleased() {
- return isReleased;
+ return isReleased.get();
}
void releaseAllResources() throws IOException {
- ArrayDeque<Buffer> releasedBuffers = new ArrayDeque<>();
- synchronized (receivedBuffers) {
- releasedBuffers.addAll(receivedBuffers);
- receivedBuffers.clear();
- isReleased = true;
+ if (isReleased.compareAndSet(false, true)) {
Review comment:
It seems we have different ways for this issue in different classes.
`LocalInputChannel` introduces `volatile` for this variable, and
`RemoteInputChannel` introduces `atomic` variable, but `RecoveredInputChannel`
takes the normal boolean variable.
We might need to unify them in separate commits future.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]