Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/6257#discussion_r200274433
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
---
@@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean
isBlocking) throws Interrupte
@Override
public void recycle(MemorySegment segment) {
+ BufferListener listener;
synchronized (availableMemorySegments) {
if (isDestroyed || numberOfRequestedMemorySegments >
currentPoolSize) {
returnMemorySegment(segment);
+ return;
}
else {
- BufferListener listener =
registeredListeners.poll();
+ listener = registeredListeners.poll();
if (listener == null) {
availableMemorySegments.add(segment);
availableMemorySegments.notify();
+ return;
}
- else {
- try {
- boolean needMoreBuffers =
listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
- if (needMoreBuffers) {
-
registeredListeners.add(listener);
- }
+ }
+ }
+
+ // We do not know which locks have been acquired before the
recycle() or are needed in the
+ // notification and which other threads also access them.
+ // -> call notifyBufferAvailable() outside of the synchronized
block to avoid a deadlock (FLINK-9676)
+ boolean success = false;
+ boolean needMoreBuffers = false;
+ try {
+ needMoreBuffers = listener.notifyBufferAvailable(new
NetworkBuffer(segment, this));
+ success = true;
+ } catch (Throwable ignored) {
+ // handled below, under the lock
+ }
+
+ if (!success || needMoreBuffers) {
+ synchronized (availableMemorySegments) {
+ if (isDestroyed) {
+ // cleanup tasks how they would have
been done if we only had one synchronized block
+ if (needMoreBuffers) {
+
listener.notifyBufferDestroyed();
--- End diff --
true, this ~could~ should be done outside the lock as well
---