This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.12 by this push:
new 949db6f [FLINK-22946][runtime] Recycle floating buffer outside the
lock to avoid deadlock
949db6f is described below
commit 949db6fdf7b7fa4d95d10e7b0782d66a2b0c1fd1
Author: Guokuai Huang <[email protected]>
AuthorDate: Wed Jun 16 16:01:53 2021 +0800
[FLINK-22946][runtime] Recycle floating buffer outside the lock to avoid
deadlock
---
.../network/partition/consumer/BufferManager.java | 44 ++++++++++++++--------
1 file changed, 28 insertions(+), 16 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java
index 2e5062e..ddef020 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java
@@ -135,6 +135,13 @@ public class BufferManager implements BufferListener,
BufferRecycler {
"The number of exclusive buffers per channel should be larger
than 0.");
synchronized (bufferQueue) {
+ // AvailableBufferQueue::addExclusiveBuffer may release the
previously allocated
+ // floating buffer, which requires the caller to recycle these
released floating
+ // buffers. There should be no floating buffers that have been
allocated before the
+ // exclusive buffers are initialized, so here only a simple
assertion is required
+ checkState(
+ unsynchronizedGetFloatingBuffersAvailable() == 0,
+ "Bug in buffer allocation logic: floating buffer is
allocated before exclusive buffers are initialized.");
for (MemorySegment segment : segments) {
bufferQueue.addExclusiveBuffer(
new NetworkBuffer(segment, this), numRequiredBuffers);
@@ -187,15 +194,16 @@ public class BufferManager implements BufferListener,
BufferRecycler {
*/
@Override
public void recycle(MemorySegment segment) {
- int numAddedBuffers = 0;
+ @Nullable Buffer releasedFloatingBuffer = null;
synchronized (bufferQueue) {
try {
// Similar to notifyBufferAvailable(), make sure that we never
add a buffer
// after channel released all buffers via
releaseAllResources().
if (inputChannel.isReleased()) {
globalPool.recycleMemorySegments(Collections.singletonList(segment));
+ return;
} else {
- numAddedBuffers =
+ releasedFloatingBuffer =
bufferQueue.addExclusiveBuffer(
new NetworkBuffer(segment, this),
numRequiredBuffers);
}
@@ -206,10 +214,14 @@ public class BufferManager implements BufferListener,
BufferRecycler {
}
}
- try {
- inputChannel.notifyBufferAvailable(numAddedBuffers);
- } catch (Throwable t) {
- ExceptionUtils.rethrow(t);
+ if (releasedFloatingBuffer != null) {
+ releasedFloatingBuffer.recycleBuffer();
+ } else {
+ try {
+ inputChannel.notifyBufferAvailable(1);
+ } catch (Throwable t) {
+ ExceptionUtils.rethrow(t);
+ }
}
}
@@ -368,23 +380,23 @@ public class BufferManager implements BufferListener,
BufferRecycler {
}
/**
- * Adds an exclusive buffer (back) into the queue and recycles one
floating buffer if the
- * number of available buffers in queue is more than the required
amount.
+ * Adds an exclusive buffer (back) into the queue and releases one
floating buffer if the
+ * number of available buffers in queue is more than the required
amount. If floating buffer
+ * is released, the total amount of available buffers after adding
this exclusive buffer has
+ * not changed, and no new buffers are available. The caller is
responsible for recycling
+ * the release/returned floating buffer.
*
* @param buffer The exclusive buffer to add
* @param numRequiredBuffers The number of required buffers
- * @return How many buffers were added to the queue
+ * @return An released floating buffer, may be null if the
numRequiredBuffers is not met.
*/
- int addExclusiveBuffer(Buffer buffer, int numRequiredBuffers) {
+ @Nullable
+ Buffer addExclusiveBuffer(Buffer buffer, int numRequiredBuffers) {
exclusiveBuffers.add(buffer);
if (getAvailableBufferSize() > numRequiredBuffers) {
- Buffer floatingBuffer = floatingBuffers.poll();
- if (floatingBuffer != null) {
- floatingBuffer.recycleBuffer();
- return 0;
- }
+ return floatingBuffers.poll();
}
- return 1;
+ return null;
}
void addFloatingBuffer(Buffer buffer) {