zhijiangW commented on a change in pull request #6829: [FLINK-10367][network]
Introduce NotificationResult for BufferListener to solve recursive stack
overflow
URL: https://github.com/apache/flink/pull/6829#discussion_r235761668
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
##########
@@ -258,36 +259,38 @@ private MemorySegment requestMemorySegment(boolean
isBlocking) throws Interrupte
@Override
public void recycle(MemorySegment segment) {
BufferListener listener;
- synchronized (availableMemorySegments) {
- if (isDestroyed || numberOfRequestedMemorySegments >
currentPoolSize) {
- returnMemorySegment(segment);
- return;
- } else {
- listener = registeredListeners.poll();
-
- if (listener == null) {
- availableMemorySegments.add(segment);
- availableMemorySegments.notify();
+ NotificationResult notificationResult = NotificationResult.NONE;
+ while (!notificationResult.bufferUsed()) {
+ synchronized (availableMemorySegments) {
+ if (isDestroyed ||
numberOfRequestedMemorySegments > currentPoolSize) {
+ returnMemorySegment(segment);
return;
+ } else {
+ listener = registeredListeners.poll();
+
+ if (listener == null) {
+
availableMemorySegments.add(segment);
+
availableMemorySegments.notify();
+ return;
+ }
}
}
- }
-
- // We do not know which locks have been acquired before the
recycle() or are needed in the
- // notification and which other threads also access them.
- // -> call notifyBufferAvailable() outside of the synchronized
block to avoid a deadlock (FLINK-9676)
- // Note that in case of any exceptions notifyBufferAvailable()
should recycle the buffer
- // (either directly or later during error handling) and
therefore eventually end up in this
- // method again.
- boolean needMoreBuffers = listener.notifyBufferAvailable(new
NetworkBuffer(segment, this));
- if (needMoreBuffers) {
- synchronized (availableMemorySegments) {
- if (isDestroyed) {
- // cleanup tasks how they would have
been done if we only had one synchronized block
- listener.notifyBufferDestroyed();
- } else {
- registeredListeners.add(listener);
+ // We do not know which locks have been acquired before
the recycle() or are needed in the
Review comment:
Make sense, the logic indeeds a little long for `recycle` method.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services