dawidwys commented on a change in pull request #17663: URL: https://github.com/apache/flink/pull/17663#discussion_r743601715
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java ########## @@ -338,10 +338,13 @@ void releaseAllResources() throws IOException { @Override void announceBufferSize(int newBufferSize) { - checkState(!isReleased, "Channel released."); - - ResultSubpartitionView subpartitionView = checkNotNull(this.subpartitionView); - subpartitionView.notifyNewBufferSize(newBufferSize); + ResultSubpartitionView view = this.subpartitionView; + // if releaseAllResources would be called from the mailbox thread it is possible that Review comment: > But my current solution considers that it is impossible to call announceBufferSize and releaseAllResources in parallel Please confirm that this may happen only when cancelling or failing a task externally, because we `closeAllInputGates` from the `TaskCanceler` thread. I've looked into it and I think that: 1. It's the same/similar for all of the methods. The cancellation may happen at any given time, so taking e.g. `RemoteInputChannel#resumeConsumption` as an example, the channel might be released after the `checkState` condition. 2. It is rather unlikely to cause problems. It very much depends on timing. 3. Even if we fail (either on the `checkState` or while sending messages to the channel (I believe it might happen. I don't know which _listener_ you had in mind) it should not matter much. We are failing anyway and this exception should be ignored. We are already in FAILED/CANCELLING state (see `Task:765-821`). Please let me know what you think. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java ########## @@ -396,12 +396,18 @@ int getBuffersInUseCount() { @VisibleForTesting public void announceBufferSize(int newBufferSize) { for (InputChannel channel : channels) { - channel.announceBufferSize(newBufferSize); + if (!channel.isReleased()) { + channel.announceBufferSize(newBufferSize); + } } } @Override public void triggerDebloating() { + if (isFinished()) { Review comment: How about?: ``` if (isFinished() || closeFuture.isDone()) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org