[ 
https://issues.apache.org/jira/browse/FLINK-10367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16700498#comment-16700498
 ] 

ASF GitHub Bot commented on FLINK-10367:
----------------------------------------

pnowojski closed pull request #6829: [FLINK-10367][network] Introduce 
NotificationResult for BufferListener to solve recursive stack overflow
URL: https://github.com/apache/flink/pull/6829
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferListener.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferListener.java
index 4cc32c0a661..e6b5416d986 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferListener.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferListener.java
@@ -24,6 +24,41 @@
  */
 public interface BufferListener {
 
+       /**
+        * Status of the notification result from the buffer listener.
+        */
+       enum NotificationResult {
+               BUFFER_NOT_USED(false, false),
+               BUFFER_USED_NO_NEED_MORE(true, false),
+               BUFFER_USED_NEED_MORE(true, true);
+
+               private final boolean isBufferUsed;
+               private final boolean needsMoreBuffers;
+
+               NotificationResult(boolean isBufferUsed, boolean 
needsMoreBuffers) {
+                       this.isBufferUsed = isBufferUsed;
+                       this.needsMoreBuffers = needsMoreBuffers;
+               }
+
+               /**
+                * Whether the notified buffer is accepted to use by the 
listener.
+                *
+                * @return <tt>true</tt> if the notified buffer is accepted.
+                */
+               boolean isBufferUsed() {
+                       return isBufferUsed;
+               }
+
+               /**
+                * Whether the listener still needs more buffers to be notified.
+                *
+                * @return <tt>true</tt> if the listener is still waiting for 
more buffers.
+                */
+               boolean needsMoreBuffers() {
+                       return needsMoreBuffers;
+               }
+       }
+
        /**
         * Notification callback if a buffer is recycled and becomes available 
in buffer pool.
         *
@@ -37,9 +72,9 @@
         * stack!
         *
         * @param buffer buffer that becomes available in buffer pool.
-        * @return true if the listener wants to be notified next time.
+        * @return NotificationResult if the listener wants to be notified next 
time.
         */
-       boolean notifyBufferAvailable(Buffer buffer);
+       NotificationResult notifyBufferAvailable(Buffer buffer);
 
        /**
         * Notification callback if the buffer provider is destroyed.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index 1596fded6f3..273822746fc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.buffer;
 
 import org.apache.flink.core.memory.MemorySegment;
+import 
org.apache.flink.runtime.io.network.buffer.BufferListener.NotificationResult;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.slf4j.Logger;
@@ -258,30 +259,31 @@ private MemorySegment requestMemorySegment(boolean 
isBlocking) throws Interrupte
        @Override
        public void recycle(MemorySegment segment) {
                BufferListener listener;
-               synchronized (availableMemorySegments) {
-                       if (isDestroyed || numberOfRequestedMemorySegments > 
currentPoolSize) {
-                               returnMemorySegment(segment);
-                               return;
-                       } else {
-                               listener = registeredListeners.poll();
-
-                               if (listener == null) {
-                                       availableMemorySegments.add(segment);
-                                       availableMemorySegments.notify();
+               NotificationResult notificationResult = 
NotificationResult.BUFFER_NOT_USED;
+               while (!notificationResult.isBufferUsed()) {
+                       synchronized (availableMemorySegments) {
+                               if (isDestroyed || 
numberOfRequestedMemorySegments > currentPoolSize) {
+                                       returnMemorySegment(segment);
                                        return;
+                               } else {
+                                       listener = registeredListeners.poll();
+                                       if (listener == null) {
+                                               
availableMemorySegments.add(segment);
+                                               
availableMemorySegments.notify();
+                                               return;
+                                       }
                                }
                        }
+                       notificationResult = 
fireBufferAvailableNotification(listener, segment);
                }
+       }
 
+       private NotificationResult 
fireBufferAvailableNotification(BufferListener listener, MemorySegment segment) 
{
                // We do not know which locks have been acquired before the 
recycle() or are needed in the
                // notification and which other threads also access them.
                // -> call notifyBufferAvailable() outside of the synchronized 
block to avoid a deadlock (FLINK-9676)
-               // Note that in case of any exceptions notifyBufferAvailable() 
should recycle the buffer
-               // (either directly or later during error handling) and 
therefore eventually end up in this
-               // method again.
-               boolean needMoreBuffers = listener.notifyBufferAvailable(new 
NetworkBuffer(segment, this));
-
-               if (needMoreBuffers) {
+               NotificationResult notificationResult = 
listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
+               if (notificationResult.needsMoreBuffers()) {
                        synchronized (availableMemorySegments) {
                                if (isDestroyed) {
                                        // cleanup tasks how they would have 
been done if we only had one synchronized block
@@ -291,6 +293,7 @@ public void recycle(MemorySegment segment) {
                                }
                        }
                }
+               return notificationResult;
        }
 
        /**
@@ -388,5 +391,4 @@ private void returnExcessMemorySegments() {
                        returnMemorySegment(segment);
                }
        }
-
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
index c5ba7a4b7f1..34f65c0f22d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
@@ -417,32 +417,18 @@ public void notifyBufferDestroyed() {
 
                // Called by the recycling thread (not network I/O thread)
                @Override
-               public boolean notifyBufferAvailable(Buffer buffer) {
-                       boolean success = false;
-
-                       try {
-                               if (availableBuffer.compareAndSet(null, 
buffer)) {
-                                       ctx.channel().eventLoop().execute(this);
+               public NotificationResult notifyBufferAvailable(Buffer buffer) {
+                       if (availableBuffer.compareAndSet(null, buffer)) {
+                               ctx.channel().eventLoop().execute(this);
 
-                                       success = true;
-                               }
-                               else {
-                                       throw new 
IllegalStateException("Received a buffer notification, " +
-                                                       " but the previous one 
has not been handled yet.");
-                               }
-                       }
-                       catch (Throwable t) {
-                               ctx.channel().eventLoop().execute(new 
AsyncErrorNotificationTask(t));
+                               return 
NotificationResult.BUFFER_USED_NO_NEED_MORE;
                        }
-                       finally {
-                               if (!success) {
-                                       if (buffer != null) {
-                                               buffer.recycleBuffer();
-                                       }
-                               }
+                       else {
+                               ctx.channel().eventLoop().execute(new 
AsyncErrorNotificationTask(
+                                       new IllegalStateException("Received a 
buffer notification, " +
+                                               " but the previous one has not 
been handled yet.")));
+                               return NotificationResult.BUFFER_NOT_USED;
                        }
-
-                       return false;
                }
 
                /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 6738abd7f9c..141494996c7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -342,18 +342,18 @@ boolean isWaitingForFloatingBuffers() {
 
        /**
         * The Buffer pool notifies this channel of an available floating 
buffer. If the channel is released or
-        * currently does not need extra buffers, the buffer should be recycled 
to the buffer pool. Otherwise,
+        * currently does not need extra buffers, the buffer should be returned 
to the buffer pool. Otherwise,
         * the buffer will be added into the <tt>bufferQueue</tt> and the 
unannounced credit is increased
         * by one.
         *
         * @param buffer Buffer that becomes available in buffer pool.
-        * @return True when this channel is waiting for more floating buffers, 
otherwise false.
+        * @return NotificationResult indicates whether this channel accepts 
the buffer and is waiting for
+        *      more floating buffers.
         */
        @Override
-       public boolean notifyBufferAvailable(Buffer buffer) {
-               boolean recycleBuffer = true;
+       public NotificationResult notifyBufferAvailable(Buffer buffer) {
+               NotificationResult notificationResult = 
NotificationResult.BUFFER_NOT_USED;
                try {
-                       boolean needMoreBuffers = false;
                        synchronized (bufferQueue) {
                                checkState(isWaitingForFloatingBuffers,
                                        "This channel should be waiting for 
floating buffers.");
@@ -364,36 +364,29 @@ public boolean notifyBufferAvailable(Buffer buffer) {
                                // -> then isReleased is set correctly
                                // 2) releaseAllResources() did not yet release 
buffers from bufferQueue
                                // -> we may or may not have set isReleased yet 
but will always wait for the
-                               //    lock on bufferQueue to release buffers
+                               // lock on bufferQueue to release buffers
                                if (isReleased.get() || 
bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) {
                                        isWaitingForFloatingBuffers = false;
-                                       recycleBuffer = false; // just in case
-                                       buffer.recycleBuffer();
-                                       return false;
+                                       return notificationResult;
                                }
 
-                               recycleBuffer = false;
                                bufferQueue.addFloatingBuffer(buffer);
 
                                if (bufferQueue.getAvailableBufferSize() == 
numRequiredBuffers) {
                                        isWaitingForFloatingBuffers = false;
+                                       notificationResult = 
NotificationResult.BUFFER_USED_NO_NEED_MORE;
                                } else {
-                                       needMoreBuffers = true;
+                                       notificationResult = 
NotificationResult.BUFFER_USED_NEED_MORE;
                                }
                        }
 
                        if (unannouncedCredit.getAndAdd(1) == 0) {
                                notifyCreditAvailable();
                        }
-
-                       return needMoreBuffers;
                } catch (Throwable t) {
-                       if (recycleBuffer) {
-                               buffer.recycleBuffer();
-                       }
                        setError(t);
-                       return false;
                }
+               return notificationResult;
        }
 
        @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
index 537d167908f..a0e10d7c687 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
@@ -414,10 +414,14 @@ private BufferListener createBufferListener(int 
notificationTimes) {
                        AtomicInteger times = new AtomicInteger(0);
 
                        @Override
-                       public boolean notifyBufferAvailable(Buffer buffer) {
+                       public NotificationResult notifyBufferAvailable(Buffer 
buffer) {
                                int newCount = times.incrementAndGet();
                                buffer.recycleBuffer();
-                               return newCount < notificationTimes;
+                               if (newCount < notificationTimes) {
+                                       return 
NotificationResult.BUFFER_USED_NEED_MORE;
+                               } else {
+                                       return 
NotificationResult.BUFFER_USED_NO_NEED_MORE;
+                               }
                        }
 
                        @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index ec80459f0ea..7747421fc7b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -23,6 +23,7 @@
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.buffer.BufferListener.NotificationResult;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
@@ -150,7 +151,10 @@ private void testConcurrentReleaseAndSomething(
                                                for (int j = 0; j < 128; j++) {
                                                        // this is the same 
buffer over and over again which will be
                                                        // recycled by the 
RemoteInputChannel
-                                                       
function.apply(inputChannel, buffer.retainBuffer(), j);
+                                                       Object obj = 
function.apply(inputChannel, buffer.retainBuffer(), j);
+                                                       if (obj instanceof 
NotificationResult && obj == NotificationResult.BUFFER_NOT_USED) {
+                                                               
buffer.recycleBuffer();
+                                                       }
                                                }
 
                                                if (inputChannel.isReleased()) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Avoid recursion stack overflow during releasing SingleInputGate
> ---------------------------------------------------------------
>
>                 Key: FLINK-10367
>                 URL: https://issues.apache.org/jira/browse/FLINK-10367
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Network
>    Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0
>            Reporter: zhijiang
>            Assignee: zhijiang
>            Priority: Minor
>              Labels: pull-request-available
>
> For task failure or canceling, the {{SingleInputGate#releaseAllResources}} 
> will be invoked before task exits.
> In the process of {{SingleInputGate#releaseAllResources}}, we first loop to 
> release all the input channels, then destroy the {{BufferPool}}.  For 
> {{RemoteInputChannel#releaseAllResources}}, it will return floating buffers 
> to the {{BufferPool}} {{which assigns this recycled buffer to the other 
> listeners(RemoteInputChannel}}). 
> It may exist recursive call in this process. If the listener is already 
> released before, it will directly recycle this buffer to the {{BufferPool}} 
> which takes another listener to notify available buffer. The above process 
> may be invoked repeatedly in recursive way.
> If there are many input channels as listeners in the {{BufferPool}}, it will 
> cause {{StackOverflow}} error because of recursion. And in our testing job, 
> the scale of 10,000 input channels ever caused this error.
> I think of two ways for solving this potential problem:
>  # When the input channel is released, it should notify the {{BufferPool}} of 
> unregistering this listener, otherwise it is inconsistent between them.
>  # {{SingleInputGate}} should destroy the {{BufferPool}} first, then loop to 
> release all the internal input channels. To do so, all the listeners in 
> {{BufferPool}} will be removed during destroying, and the input channel will 
> not have further interactions during 
> {{RemoteInputChannel#releaseAllResources}}.
> I prefer the second way to solve this problem, because we do not want to 
> expand another interface method for removing buffer listener, further 
> currently the internal data structure in {{BufferPool}} can not support 
> remove a listener directly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to