This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new aa7baa5  [FLINK-22946][runtime] Recycle floating buffer outside the 
lock to avoid deadlock
aa7baa5 is described below

commit aa7baa5bd9d4ff1792c2e4e396fbc8564d71004a
Author: Guokuai Huang <[email protected]>
AuthorDate: Wed Jun 16 16:01:53 2021 +0800

    [FLINK-22946][runtime] Recycle floating buffer outside the lock to avoid 
deadlock
---
 .../network/partition/consumer/BufferManager.java  | 44 ++++++++++++++--------
 1 file changed, 28 insertions(+), 16 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java
index c4b48c4..cfd633f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java
@@ -135,6 +135,13 @@ public class BufferManager implements BufferListener, 
BufferRecycler {
                 "The number of exclusive buffers per channel should be larger 
than 0.");
 
         synchronized (bufferQueue) {
+            // AvailableBufferQueue::addExclusiveBuffer may release the 
previously allocated
+            // floating buffer, which requires the caller to recycle these 
released floating
+            // buffers. There should be no floating buffers that have been 
allocated before the
+            // exclusive buffers are initialized, so here only a simple 
assertion is required
+            checkState(
+                    unsynchronizedGetFloatingBuffersAvailable() == 0,
+                    "Bug in buffer allocation logic: floating buffer is 
allocated before exclusive buffers are initialized.");
             for (MemorySegment segment : segments) {
                 bufferQueue.addExclusiveBuffer(
                         new NetworkBuffer(segment, this), numRequiredBuffers);
@@ -188,15 +195,16 @@ public class BufferManager implements BufferListener, 
BufferRecycler {
      */
     @Override
     public void recycle(MemorySegment segment) {
-        int numAddedBuffers = 0;
+        @Nullable Buffer releasedFloatingBuffer = null;
         synchronized (bufferQueue) {
             try {
                 // Similar to notifyBufferAvailable(), make sure that we never 
add a buffer
                 // after channel released all buffers via 
releaseAllResources().
                 if (inputChannel.isReleased()) {
                     
globalPool.recycleMemorySegments(Collections.singletonList(segment));
+                    return;
                 } else {
-                    numAddedBuffers =
+                    releasedFloatingBuffer =
                             bufferQueue.addExclusiveBuffer(
                                     new NetworkBuffer(segment, this), 
numRequiredBuffers);
                 }
@@ -207,10 +215,14 @@ public class BufferManager implements BufferListener, 
BufferRecycler {
             }
         }
 
-        try {
-            inputChannel.notifyBufferAvailable(numAddedBuffers);
-        } catch (Throwable t) {
-            ExceptionUtils.rethrow(t);
+        if (releasedFloatingBuffer != null) {
+            releasedFloatingBuffer.recycleBuffer();
+        } else {
+            try {
+                inputChannel.notifyBufferAvailable(1);
+            } catch (Throwable t) {
+                ExceptionUtils.rethrow(t);
+            }
         }
     }
 
@@ -369,23 +381,23 @@ public class BufferManager implements BufferListener, 
BufferRecycler {
         }
 
         /**
-         * Adds an exclusive buffer (back) into the queue and recycles one 
floating buffer if the
-         * number of available buffers in queue is more than the required 
amount.
+         * Adds an exclusive buffer (back) into the queue and releases one 
floating buffer if the
+         * number of available buffers in queue is more than the required 
amount. If floating buffer
+         * is released, the total amount of available buffers after adding 
this exclusive buffer has
+         * not changed, and no new buffers are available. The caller is 
responsible for recycling
+         * the release/returned floating buffer.
          *
          * @param buffer The exclusive buffer to add
          * @param numRequiredBuffers The number of required buffers
-         * @return How many buffers were added to the queue
+         * @return An released floating buffer, may be null if the 
numRequiredBuffers is not met.
          */
-        int addExclusiveBuffer(Buffer buffer, int numRequiredBuffers) {
+        @Nullable
+        Buffer addExclusiveBuffer(Buffer buffer, int numRequiredBuffers) {
             exclusiveBuffers.add(buffer);
             if (getAvailableBufferSize() > numRequiredBuffers) {
-                Buffer floatingBuffer = floatingBuffers.poll();
-                if (floatingBuffer != null) {
-                    floatingBuffer.recycleBuffer();
-                    return 0;
-                }
+                return floatingBuffers.poll();
             }
-            return 1;
+            return null;
         }
 
         void addFloatingBuffer(Buffer buffer) {

Reply via email to