This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7784ec7284bfc26a2561d8cd9ec3ab4aead6c104
Author: kevin.cyj <[email protected]>
AuthorDate: Thu Dec 30 21:56:15 2021 +0800

    [FLINK-25407][network] Fix the issues caused by FLINK-24035
    
    This PR tries to fix the issues caused by FLINK-24035. More specifically, 
there are two issues, the first one is the deadlock caused by acquiring the 
'factoryLock' in NetworkBufferPool and the other is the incorrect decreasing of 
the required segments of NetworkBufferPool. Both issues occur during exception 
handling of requesting segments. Actually, when reserving memory segments for 
LocalBufferPool, there is no need to modify the value of required segments. As 
a result, there is no n [...]
    
    This closes #18173.
---
 .../io/network/buffer/NetworkBufferPool.java       | 17 ++++---
 .../io/network/buffer/LocalBufferPoolTest.java     | 59 ++++++++++++++++++++++
 2 files changed, 70 insertions(+), 6 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index 509db03..d9717a4 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -208,7 +208,13 @@ public class NetworkBufferPool
             tryRedistributeBuffers(numberOfSegmentsToRequest);
         }
 
-        return internalRequestMemorySegments(numberOfSegmentsToRequest);
+        try {
+            return internalRequestMemorySegments(numberOfSegmentsToRequest);
+        } catch (IOException exception) {
+            revertRequiredBuffers(numberOfSegmentsToRequest);
+            ExceptionUtils.rethrowIOException(exception);
+            return null;
+        }
     }
 
     private List<MemorySegment> internalRequestMemorySegments(int 
numberOfSegmentsToRequest)
@@ -248,7 +254,7 @@ public class NetworkBufferPool
                 }
             }
         } catch (Throwable e) {
-            recycleMemorySegments(segments, numberOfSegmentsToRequest);
+            internalRecycleMemorySegments(segments);
             ExceptionUtils.rethrowIOException(e);
         }
 
@@ -272,12 +278,11 @@ public class NetworkBufferPool
      */
     @Override
     public void recycleUnpooledMemorySegments(Collection<MemorySegment> 
segments) {
-        recycleMemorySegments(segments, segments.size());
-    }
-
-    private void recycleMemorySegments(Collection<MemorySegment> segments, int 
size) {
         internalRecycleMemorySegments(segments);
+        revertRequiredBuffers(segments.size());
+    }
 
+    private void revertRequiredBuffers(int size) {
         synchronized (factoryLock) {
             numTotalRequiredBuffers -= size;
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
index b4b56fe..57edaca 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
@@ -135,6 +135,65 @@ public class LocalBufferPoolTest extends TestLogger {
         }
     }
 
+    @Test(timeout = 10000) // timeout can indicate a potential deadlock
+    public void testReserveSegmentsAndCancel() throws Exception {
+        int totalSegments = 4;
+        int segmentsToReserve = 2;
+
+        NetworkBufferPool globalPool = new NetworkBufferPool(totalSegments, 
memorySegmentSize);
+        BufferPool localPool1 = globalPool.createBufferPool(segmentsToReserve, 
totalSegments);
+        List<MemorySegment> segments = new ArrayList<>();
+
+        try {
+            for (int i = 0; i < totalSegments; ++i) {
+                segments.add(localPool1.requestMemorySegmentBlocking());
+            }
+
+            BufferPool localPool2 = 
globalPool.createBufferPool(segmentsToReserve, totalSegments);
+            // the segment reserve thread will be blocked for no buffer is 
available
+            Thread reserveThread =
+                    new Thread(
+                            () -> {
+                                try {
+                                    
localPool2.reserveSegments(segmentsToReserve);
+                                } catch (Throwable ignored) {
+                                }
+                            });
+            reserveThread.start();
+            Thread.sleep(100); // wait to be blocked
+
+            // the cancel thread can be blocked when redistributing buffers
+            Thread cancelThread =
+                    new Thread(
+                            () -> {
+                                localPool1.lazyDestroy();
+                                localPool2.lazyDestroy();
+                            });
+            cancelThread.start();
+
+            // it is expected that the segment reserve thread can be cancelled 
successfully
+            Thread interruptThread =
+                    new Thread(
+                            () -> {
+                                try {
+                                    do {
+                                        reserveThread.interrupt();
+                                        Thread.sleep(100);
+                                    } while (reserveThread.isAlive() || 
cancelThread.isAlive());
+                                } catch (Throwable ignored) {
+                                }
+                            });
+            interruptThread.start();
+
+            interruptThread.join();
+        } finally {
+            segments.forEach(localPool1::recycle);
+            localPool1.lazyDestroy();
+            assertEquals(0, globalPool.getNumberOfUsedMemorySegments());
+            globalPool.destroy();
+        }
+    }
+
     @Test
     public void testRequestMoreThanAvailable() {
         localBufferPool.setNumBuffers(numBuffers);

Reply via email to