dawidwys commented on a change in pull request #17663:
URL: https://github.com/apache/flink/pull/17663#discussion_r743601715



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
##########
@@ -338,10 +338,13 @@ void releaseAllResources() throws IOException {
 
     @Override
     void announceBufferSize(int newBufferSize) {
-        checkState(!isReleased, "Channel released.");
-
-        ResultSubpartitionView subpartitionView = 
checkNotNull(this.subpartitionView);
-        subpartitionView.notifyNewBufferSize(newBufferSize);
+        ResultSubpartitionView view = this.subpartitionView;
+        // if releaseAllResources would be called from the mailbox thread it 
is possible that

Review comment:
       > But my current solution considers that it is impossible to call 
announceBufferSize and releaseAllResources in parallel
   
   Please confirm that this may happen only when cancelling or failing a task 
externally, because we `closeAllInputGates` from the `TaskCanceler` thread. 
I've looked into it and I think that:
   1. It's the same/similar for all of the methods. The cancellation may happen 
at any given time, so taking e.g. `RemoteInputChannel#resumeConsumption` as an 
example, the channel might be released after the `checkState` condition.
   2. It is rather unlikely to cause problems. It very much depends on timing.
   3. Even if we fail (either on the `checkState` or while sending messages to 
the channel (I believe it might happen. I don't know which _listener_ you had 
in mind) it should not matter much. We are failing anyway and this exception 
should be ignored. We are already in FAILED/CANCELLING state (see 
`Task:765-821`).
   
   Please let me know what you think.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
##########
@@ -396,12 +396,18 @@ int getBuffersInUseCount() {
     @VisibleForTesting
     public void announceBufferSize(int newBufferSize) {
         for (InputChannel channel : channels) {
-            channel.announceBufferSize(newBufferSize);
+            if (!channel.isReleased()) {
+                channel.announceBufferSize(newBufferSize);
+            }
         }
     }
 
     @Override
     public void triggerDebloating() {
+        if (isFinished()) {

Review comment:
       How about?:
   ```
           if (isFinished() || closeFuture.isDone()) {
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to