zhijiangW commented on a change in pull request #6829: [FLINK-10367][network] 
Introduce NotificationResult for BufferListener to solve recursive stack 
overflow
URL: https://github.com/apache/flink/pull/6829#discussion_r235761668
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 ##########
 @@ -258,36 +259,38 @@ private MemorySegment requestMemorySegment(boolean 
isBlocking) throws Interrupte
        @Override
        public void recycle(MemorySegment segment) {
                BufferListener listener;
-               synchronized (availableMemorySegments) {
-                       if (isDestroyed || numberOfRequestedMemorySegments > 
currentPoolSize) {
-                               returnMemorySegment(segment);
-                               return;
-                       } else {
-                               listener = registeredListeners.poll();
-
-                               if (listener == null) {
-                                       availableMemorySegments.add(segment);
-                                       availableMemorySegments.notify();
+               NotificationResult notificationResult = NotificationResult.NONE;
+               while (!notificationResult.bufferUsed()) {
+                       synchronized (availableMemorySegments) {
+                               if (isDestroyed || 
numberOfRequestedMemorySegments > currentPoolSize) {
+                                       returnMemorySegment(segment);
                                        return;
+                               } else {
+                                       listener = registeredListeners.poll();
+
+                                       if (listener == null) {
+                                               
availableMemorySegments.add(segment);
+                                               
availableMemorySegments.notify();
+                                               return;
+                                       }
                                }
                        }
-               }
-
-               // We do not know which locks have been acquired before the 
recycle() or are needed in the
-               // notification and which other threads also access them.
-               // -> call notifyBufferAvailable() outside of the synchronized 
block to avoid a deadlock (FLINK-9676)
-               // Note that in case of any exceptions notifyBufferAvailable() 
should recycle the buffer
-               // (either directly or later during error handling) and 
therefore eventually end up in this
-               // method again.
-               boolean needMoreBuffers = listener.notifyBufferAvailable(new 
NetworkBuffer(segment, this));
 
-               if (needMoreBuffers) {
-                       synchronized (availableMemorySegments) {
-                               if (isDestroyed) {
-                                       // cleanup tasks how they would have 
been done if we only had one synchronized block
-                                       listener.notifyBufferDestroyed();
-                               } else {
-                                       registeredListeners.add(listener);
+                       // We do not know which locks have been acquired before 
the recycle() or are needed in the
 
 Review comment:
   Make sense, the logic indeeds a little long for `recycle` method.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to