[
https://issues.apache.org/jira/browse/FLINK-10367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16700498#comment-16700498
]
ASF GitHub Bot commented on FLINK-10367:
----------------------------------------
pnowojski closed pull request #6829: [FLINK-10367][network] Introduce
NotificationResult for BufferListener to solve recursive stack overflow
URL: https://github.com/apache/flink/pull/6829
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferListener.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferListener.java
index 4cc32c0a661..e6b5416d986 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferListener.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferListener.java
@@ -24,6 +24,41 @@
*/
public interface BufferListener {
+ /**
+ * Status of the notification result from the buffer listener.
+ */
+ enum NotificationResult {
+ BUFFER_NOT_USED(false, false),
+ BUFFER_USED_NO_NEED_MORE(true, false),
+ BUFFER_USED_NEED_MORE(true, true);
+
+ private final boolean isBufferUsed;
+ private final boolean needsMoreBuffers;
+
+ NotificationResult(boolean isBufferUsed, boolean
needsMoreBuffers) {
+ this.isBufferUsed = isBufferUsed;
+ this.needsMoreBuffers = needsMoreBuffers;
+ }
+
+ /**
+ * Whether the notified buffer is accepted to use by the
listener.
+ *
+ * @return <tt>true</tt> if the notified buffer is accepted.
+ */
+ boolean isBufferUsed() {
+ return isBufferUsed;
+ }
+
+ /**
+ * Whether the listener still needs more buffers to be notified.
+ *
+ * @return <tt>true</tt> if the listener is still waiting for
more buffers.
+ */
+ boolean needsMoreBuffers() {
+ return needsMoreBuffers;
+ }
+ }
+
/**
* Notification callback if a buffer is recycled and becomes available
in buffer pool.
*
@@ -37,9 +72,9 @@
* stack!
*
* @param buffer buffer that becomes available in buffer pool.
- * @return true if the listener wants to be notified next time.
+ * @return NotificationResult if the listener wants to be notified next
time.
*/
- boolean notifyBufferAvailable(Buffer buffer);
+ NotificationResult notifyBufferAvailable(Buffer buffer);
/**
* Notification callback if the buffer provider is destroyed.
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index 1596fded6f3..273822746fc 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.io.network.buffer;
import org.apache.flink.core.memory.MemorySegment;
+import
org.apache.flink.runtime.io.network.buffer.BufferListener.NotificationResult;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
@@ -258,30 +259,31 @@ private MemorySegment requestMemorySegment(boolean
isBlocking) throws Interrupte
@Override
public void recycle(MemorySegment segment) {
BufferListener listener;
- synchronized (availableMemorySegments) {
- if (isDestroyed || numberOfRequestedMemorySegments >
currentPoolSize) {
- returnMemorySegment(segment);
- return;
- } else {
- listener = registeredListeners.poll();
-
- if (listener == null) {
- availableMemorySegments.add(segment);
- availableMemorySegments.notify();
+ NotificationResult notificationResult =
NotificationResult.BUFFER_NOT_USED;
+ while (!notificationResult.isBufferUsed()) {
+ synchronized (availableMemorySegments) {
+ if (isDestroyed ||
numberOfRequestedMemorySegments > currentPoolSize) {
+ returnMemorySegment(segment);
return;
+ } else {
+ listener = registeredListeners.poll();
+ if (listener == null) {
+
availableMemorySegments.add(segment);
+
availableMemorySegments.notify();
+ return;
+ }
}
}
+ notificationResult =
fireBufferAvailableNotification(listener, segment);
}
+ }
+ private NotificationResult
fireBufferAvailableNotification(BufferListener listener, MemorySegment segment)
{
// We do not know which locks have been acquired before the
recycle() or are needed in the
// notification and which other threads also access them.
// -> call notifyBufferAvailable() outside of the synchronized
block to avoid a deadlock (FLINK-9676)
- // Note that in case of any exceptions notifyBufferAvailable()
should recycle the buffer
- // (either directly or later during error handling) and
therefore eventually end up in this
- // method again.
- boolean needMoreBuffers = listener.notifyBufferAvailable(new
NetworkBuffer(segment, this));
-
- if (needMoreBuffers) {
+ NotificationResult notificationResult =
listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
+ if (notificationResult.needsMoreBuffers()) {
synchronized (availableMemorySegments) {
if (isDestroyed) {
// cleanup tasks how they would have
been done if we only had one synchronized block
@@ -291,6 +293,7 @@ public void recycle(MemorySegment segment) {
}
}
}
+ return notificationResult;
}
/**
@@ -388,5 +391,4 @@ private void returnExcessMemorySegments() {
returnMemorySegment(segment);
}
}
-
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
index c5ba7a4b7f1..34f65c0f22d 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
@@ -417,32 +417,18 @@ public void notifyBufferDestroyed() {
// Called by the recycling thread (not network I/O thread)
@Override
- public boolean notifyBufferAvailable(Buffer buffer) {
- boolean success = false;
-
- try {
- if (availableBuffer.compareAndSet(null,
buffer)) {
- ctx.channel().eventLoop().execute(this);
+ public NotificationResult notifyBufferAvailable(Buffer buffer) {
+ if (availableBuffer.compareAndSet(null, buffer)) {
+ ctx.channel().eventLoop().execute(this);
- success = true;
- }
- else {
- throw new
IllegalStateException("Received a buffer notification, " +
- " but the previous one
has not been handled yet.");
- }
- }
- catch (Throwable t) {
- ctx.channel().eventLoop().execute(new
AsyncErrorNotificationTask(t));
+ return
NotificationResult.BUFFER_USED_NO_NEED_MORE;
}
- finally {
- if (!success) {
- if (buffer != null) {
- buffer.recycleBuffer();
- }
- }
+ else {
+ ctx.channel().eventLoop().execute(new
AsyncErrorNotificationTask(
+ new IllegalStateException("Received a
buffer notification, " +
+ " but the previous one has not
been handled yet.")));
+ return NotificationResult.BUFFER_NOT_USED;
}
-
- return false;
}
/**
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 6738abd7f9c..141494996c7 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -342,18 +342,18 @@ boolean isWaitingForFloatingBuffers() {
/**
* The Buffer pool notifies this channel of an available floating
buffer. If the channel is released or
- * currently does not need extra buffers, the buffer should be recycled
to the buffer pool. Otherwise,
+ * currently does not need extra buffers, the buffer should be returned
to the buffer pool. Otherwise,
* the buffer will be added into the <tt>bufferQueue</tt> and the
unannounced credit is increased
* by one.
*
* @param buffer Buffer that becomes available in buffer pool.
- * @return True when this channel is waiting for more floating buffers,
otherwise false.
+ * @return NotificationResult indicates whether this channel accepts
the buffer and is waiting for
+ * more floating buffers.
*/
@Override
- public boolean notifyBufferAvailable(Buffer buffer) {
- boolean recycleBuffer = true;
+ public NotificationResult notifyBufferAvailable(Buffer buffer) {
+ NotificationResult notificationResult =
NotificationResult.BUFFER_NOT_USED;
try {
- boolean needMoreBuffers = false;
synchronized (bufferQueue) {
checkState(isWaitingForFloatingBuffers,
"This channel should be waiting for
floating buffers.");
@@ -364,36 +364,29 @@ public boolean notifyBufferAvailable(Buffer buffer) {
// -> then isReleased is set correctly
// 2) releaseAllResources() did not yet release
buffers from bufferQueue
// -> we may or may not have set isReleased yet
but will always wait for the
- // lock on bufferQueue to release buffers
+ // lock on bufferQueue to release buffers
if (isReleased.get() ||
bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) {
isWaitingForFloatingBuffers = false;
- recycleBuffer = false; // just in case
- buffer.recycleBuffer();
- return false;
+ return notificationResult;
}
- recycleBuffer = false;
bufferQueue.addFloatingBuffer(buffer);
if (bufferQueue.getAvailableBufferSize() ==
numRequiredBuffers) {
isWaitingForFloatingBuffers = false;
+ notificationResult =
NotificationResult.BUFFER_USED_NO_NEED_MORE;
} else {
- needMoreBuffers = true;
+ notificationResult =
NotificationResult.BUFFER_USED_NEED_MORE;
}
}
if (unannouncedCredit.getAndAdd(1) == 0) {
notifyCreditAvailable();
}
-
- return needMoreBuffers;
} catch (Throwable t) {
- if (recycleBuffer) {
- buffer.recycleBuffer();
- }
setError(t);
- return false;
}
+ return notificationResult;
}
@Override
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
index 537d167908f..a0e10d7c687 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
@@ -414,10 +414,14 @@ private BufferListener createBufferListener(int
notificationTimes) {
AtomicInteger times = new AtomicInteger(0);
@Override
- public boolean notifyBufferAvailable(Buffer buffer) {
+ public NotificationResult notifyBufferAvailable(Buffer
buffer) {
int newCount = times.incrementAndGet();
buffer.recycleBuffer();
- return newCount < notificationTimes;
+ if (newCount < notificationTimes) {
+ return
NotificationResult.BUFFER_USED_NEED_MORE;
+ } else {
+ return
NotificationResult.BUFFER_USED_NO_NEED_MORE;
+ }
}
@Override
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index ec80459f0ea..7747421fc7b 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -23,6 +23,7 @@
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.buffer.Buffer;
+import
org.apache.flink.runtime.io.network.buffer.BufferListener.NotificationResult;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
@@ -150,7 +151,10 @@ private void testConcurrentReleaseAndSomething(
for (int j = 0; j < 128; j++) {
// this is the same
buffer over and over again which will be
// recycled by the
RemoteInputChannel
-
function.apply(inputChannel, buffer.retainBuffer(), j);
+ Object obj =
function.apply(inputChannel, buffer.retainBuffer(), j);
+ if (obj instanceof
NotificationResult && obj == NotificationResult.BUFFER_NOT_USED) {
+
buffer.recycleBuffer();
+ }
}
if (inputChannel.isReleased()) {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Avoid recursion stack overflow during releasing SingleInputGate
> ---------------------------------------------------------------
>
> Key: FLINK-10367
> URL: https://issues.apache.org/jira/browse/FLINK-10367
> Project: Flink
> Issue Type: Sub-task
> Components: Network
> Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0
> Reporter: zhijiang
> Assignee: zhijiang
> Priority: Minor
> Labels: pull-request-available
>
> For task failure or canceling, the {{SingleInputGate#releaseAllResources}}
> will be invoked before task exits.
> In the process of {{SingleInputGate#releaseAllResources}}, we first loop to
> release all the input channels, then destroy the {{BufferPool}}. For
> {{RemoteInputChannel#releaseAllResources}}, it will return floating buffers
> to the {{BufferPool}} {{which assigns this recycled buffer to the other
> listeners(RemoteInputChannel}}).
> It may exist recursive call in this process. If the listener is already
> released before, it will directly recycle this buffer to the {{BufferPool}}
> which takes another listener to notify available buffer. The above process
> may be invoked repeatedly in recursive way.
> If there are many input channels as listeners in the {{BufferPool}}, it will
> cause {{StackOverflow}} error because of recursion. And in our testing job,
> the scale of 10,000 input channels ever caused this error.
> I think of two ways for solving this potential problem:
> # When the input channel is released, it should notify the {{BufferPool}} of
> unregistering this listener, otherwise it is inconsistent between them.
> # {{SingleInputGate}} should destroy the {{BufferPool}} first, then loop to
> release all the internal input channels. To do so, all the listeners in
> {{BufferPool}} will be removed during destroying, and the input channel will
> not have further interactions during
> {{RemoteInputChannel#releaseAllResources}}.
> I prefer the second way to solve this problem, because we do not want to
> expand another interface method for removing buffer listener, further
> currently the internal data structure in {{BufferPool}} can not support
> remove a listener directly.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)