This is an automated email from the ASF dual-hosted git repository.

guoyangze pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 855348a  [FLINK-25741][runtime] Skip buffer pools which have no 
floating buffer in buffer redistributing.
855348a is described below

commit 855348a9ac482f141131c302b21d4a67e4c3e0c7
Author: Yangze Guo <[email protected]>
AuthorDate: Thu Dec 30 17:27:26 2021 +0800

    [FLINK-25741][runtime] Skip buffer pools which have no floating buffer in 
buffer redistributing.
    
    This closes #18433.
---
 .../io/network/buffer/NetworkBufferPool.java        | 21 +++++++++++++++++----
 1 file changed, 17 insertions(+), 4 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index 19e4cb8..ffdb9c7 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -83,6 +83,8 @@ public class NetworkBufferPool
 
     private final Set<LocalBufferPool> allBufferPools = new HashSet<>();
 
+    private final Set<LocalBufferPool> resizableBufferPools = new HashSet<>();
+
     private int numTotalRequiredBuffers;
 
     private final Duration requestSegmentsTimeout;
@@ -500,6 +502,10 @@ public class NetworkBufferPool
 
             allBufferPools.add(localBufferPool);
 
+            if (numRequiredBuffers < maxUsedBuffers) {
+                resizableBufferPools.add(localBufferPool);
+            }
+
             redistributeBuffers();
 
             return localBufferPool;
@@ -515,6 +521,7 @@ public class NetworkBufferPool
         synchronized (factoryLock) {
             if (allBufferPools.remove(bufferPool)) {
                 numTotalRequiredBuffers -= 
bufferPool.getNumberOfRequiredMemorySegments();
+                resizableBufferPools.remove(bufferPool);
 
                 redistributeBuffers();
             }
@@ -536,7 +543,9 @@ public class NetworkBufferPool
             }
 
             // some sanity checks
-            if (allBufferPools.size() > 0 || numTotalRequiredBuffers > 0) {
+            if (allBufferPools.size() > 0
+                    || numTotalRequiredBuffers > 0
+                    || resizableBufferPools.size() > 0) {
                 throw new IllegalStateException(
                         "NetworkBufferPool is not empty after destroying all 
LocalBufferPools");
             }
@@ -573,12 +582,16 @@ public class NetworkBufferPool
     private void redistributeBuffers() {
         assert Thread.holdsLock(factoryLock);
 
+        if (resizableBufferPools.isEmpty()) {
+            return;
+        }
+
         // All buffers, which are not among the required ones
         final int numAvailableMemorySegment = totalNumberOfMemorySegments - 
numTotalRequiredBuffers;
 
         if (numAvailableMemorySegment == 0) {
             // in this case, we need to redistribute buffers so that every 
pool gets its minimum
-            for (LocalBufferPool bufferPool : allBufferPools) {
+            for (LocalBufferPool bufferPool : resizableBufferPools) {
                 
bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments());
             }
             return;
@@ -594,7 +607,7 @@ public class NetworkBufferPool
 
         long totalCapacity = 0; // long to avoid int overflow
 
-        for (LocalBufferPool bufferPool : allBufferPools) {
+        for (LocalBufferPool bufferPool : resizableBufferPools) {
             int excessMax =
                     bufferPool.getMaxNumberOfMemorySegments()
                             - bufferPool.getNumberOfRequiredMemorySegments();
@@ -614,7 +627,7 @@ public class NetworkBufferPool
 
         long totalPartsUsed = 0; // of totalCapacity
         int numDistributedMemorySegment = 0;
-        for (LocalBufferPool bufferPool : allBufferPools) {
+        for (LocalBufferPool bufferPool : resizableBufferPools) {
             int excessMax =
                     bufferPool.getMaxNumberOfMemorySegments()
                             - bufferPool.getNumberOfRequiredMemorySegments();

Reply via email to