wsry commented on pull request #16844:
URL: https://github.com/apache/flink/pull/16844#issuecomment-900828858


   > One last question:
   > 
   > What if we move the change to `releaseInternal`, like this. In this way, 
we do not need for refacotring or changing fail(). WDYT?
   > 
   > ```
   >    @Override
   >     protected void releaseInternal() {
   > 
   >       if (broadcastBufferBuilder != null) {
   >             broadcastBufferBuilder.close();
   >             broadcastBufferBuilder = null;
   >         }
   >         for (int i = 0; i < unicastBufferBuilders.length; ++i) {
   >             if (unicastBufferBuilders[i] != null) {
   >                 unicastBufferBuilders[i].close();
   >                 unicastBufferBuilders[i] = null;
   >             }
   >         } 
   >         // Release all subpartitions
   >         for (ResultSubpartition subpartition : subpartitions) {
   >             try {
   >                 subpartition.release();
   >             }
   >             // Catch this in order to ensure that release is called on all 
subpartitions
   >             catch (Throwable t) {
   >                 LOG.error("Error during release of result subpartition: " 
+ t.getMessage(), t);
   >             }
   >         }   
   >     }
   > ```
   
   I guess the release method can also be called from other threads? Then we 
may still have concurrency issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to