dawidwys commented on a change in pull request #17663:
URL: https://github.com/apache/flink/pull/17663#discussion_r742701577



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
##########
@@ -338,10 +338,13 @@ void releaseAllResources() throws IOException {
 
     @Override
     void announceBufferSize(int newBufferSize) {
-        checkState(!isReleased, "Channel released.");
-
-        ResultSubpartitionView subpartitionView = 
checkNotNull(this.subpartitionView);
-        subpartitionView.notifyNewBufferSize(newBufferSize);
+        ResultSubpartitionView view = this.subpartitionView;
+        // if releaseAllResources would be called from the mailbox thread it 
is possible that

Review comment:
       I am not entirely sure if the description here is correct, or better 
said if it is precise.
   
   I believe calling any of the methods from a mailbox thread is not the reason 
for the problem. Correct me if I am wrong, but I believe the problem is exactly 
the same as described in `LocalInputChannel:198-204`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
##########
@@ -338,10 +338,13 @@ void releaseAllResources() throws IOException {
 
     @Override
     void announceBufferSize(int newBufferSize) {
-        checkState(!isReleased, "Channel released.");
-
-        ResultSubpartitionView subpartitionView = 
checkNotNull(this.subpartitionView);
-        subpartitionView.notifyNewBufferSize(newBufferSize);
+        ResultSubpartitionView view = this.subpartitionView;
+        // if releaseAllResources would be called from the mailbox thread it 
is possible that
+        // announceBufferSize would be called after it.

Review comment:
       ```suggestion
           // if releaseAllResources was called from the mailbox thread it is 
possible that
           // announceBufferSize would be called after it.
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
##########
@@ -338,10 +338,13 @@ void releaseAllResources() throws IOException {
 
     @Override
     void announceBufferSize(int newBufferSize) {
-        checkState(!isReleased, "Channel released.");
-
-        ResultSubpartitionView subpartitionView = 
checkNotNull(this.subpartitionView);
-        subpartitionView.notifyNewBufferSize(newBufferSize);
+        ResultSubpartitionView view = this.subpartitionView;

Review comment:
       I guess you can move that inside of the `if` block.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
##########
@@ -338,10 +338,13 @@ void releaseAllResources() throws IOException {
 
     @Override
     void announceBufferSize(int newBufferSize) {
-        checkState(!isReleased, "Channel released.");
-
-        ResultSubpartitionView subpartitionView = 
checkNotNull(this.subpartitionView);
-        subpartitionView.notifyNewBufferSize(newBufferSize);
+        ResultSubpartitionView view = this.subpartitionView;
+        // if releaseAllResources would be called from the mailbox thread it 
is possible that

Review comment:
       BTW, don't we have a bigger problem, that we try to perform debloating 
even if a `SingleInputGate` might've finished already? What happens if we have 
two input gates and one of the gates finishes early (receives the end of 
partition), while the other processes records continuously? Could you try that 
out?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to