pnowojski commented on a change in pull request #8925: [FLINK-12852][network] 
Fix the deadlock occured when requesting exclusive buffers
URL: https://github.com/apache/flink/pull/8925#discussion_r302109362
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
 ##########
 @@ -186,6 +203,22 @@ public void recycle(MemorySegment segment) {
                                if (segment != null) {
                                        segments.add(segment);
                                }
+
 
 Review comment:
   This method has grown a bit. Could we extract some parts of it to separate 
methods?  Maybe put
   ```
                        if (numTotalRequiredBuffers + numberOfSegmentsToRequest 
> totalNumberOfMemorySegments) {
                                throw new 
IOException(String.format("Insufficient number of network buffers: " +
                                                                "required %d, 
but only %d available. The total number of network " +
                                                                "buffers is 
currently set to %d of %d bytes each. You can increase this " +
                                                                "number by 
setting the configuration keys '%s', '%s', and '%s'.",
                                        numberOfSegmentsToRequest,
                                                totalNumberOfMemorySegments - 
numTotalRequiredBuffers,
                                                totalNumberOfMemorySegments,
                                                memorySegmentSize,
                                                
NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(),
                                                
NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN.key(),
                                                
NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX.key()));
                        }
   
                        try {
                                redistributeBuffers();
                        } catch (Throwable t) {
                                this.numTotalRequiredBuffers -= 
numberOfSegmentsToRequest;
   
                                try {
                                        redistributeBuffers();
                                } catch (IOException inner) {
                                        t.addSuppressed(inner);
                                }
                                ExceptionUtils.rethrowIOException(t);
                        }
   ``` 
   into a separate `tryRedistributeBuffers()` method?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to