[
https://issues.apache.org/jira/browse/FLINK-10367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16696018#comment-16696018
]
ASF GitHub Bot commented on FLINK-10367:
----------------------------------------
zhijiangW commented on a change in pull request #6829: [FLINK-10367][network]
Introduce NotificationResult for BufferListener to solve recursive stack
overflow
URL: https://github.com/apache/flink/pull/6829#discussion_r235761668
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
##########
@@ -258,36 +259,38 @@ private MemorySegment requestMemorySegment(boolean
isBlocking) throws Interrupte
@Override
public void recycle(MemorySegment segment) {
BufferListener listener;
- synchronized (availableMemorySegments) {
- if (isDestroyed || numberOfRequestedMemorySegments >
currentPoolSize) {
- returnMemorySegment(segment);
- return;
- } else {
- listener = registeredListeners.poll();
-
- if (listener == null) {
- availableMemorySegments.add(segment);
- availableMemorySegments.notify();
+ NotificationResult notificationResult = NotificationResult.NONE;
+ while (!notificationResult.bufferUsed()) {
+ synchronized (availableMemorySegments) {
+ if (isDestroyed ||
numberOfRequestedMemorySegments > currentPoolSize) {
+ returnMemorySegment(segment);
return;
+ } else {
+ listener = registeredListeners.poll();
+
+ if (listener == null) {
+
availableMemorySegments.add(segment);
+
availableMemorySegments.notify();
+ return;
+ }
}
}
- }
-
- // We do not know which locks have been acquired before the
recycle() or are needed in the
- // notification and which other threads also access them.
- // -> call notifyBufferAvailable() outside of the synchronized
block to avoid a deadlock (FLINK-9676)
- // Note that in case of any exceptions notifyBufferAvailable()
should recycle the buffer
- // (either directly or later during error handling) and
therefore eventually end up in this
- // method again.
- boolean needMoreBuffers = listener.notifyBufferAvailable(new
NetworkBuffer(segment, this));
- if (needMoreBuffers) {
- synchronized (availableMemorySegments) {
- if (isDestroyed) {
- // cleanup tasks how they would have
been done if we only had one synchronized block
- listener.notifyBufferDestroyed();
- } else {
- registeredListeners.add(listener);
+ // We do not know which locks have been acquired before
the recycle() or are needed in the
Review comment:
Make sense, the logic indeeds a little long for `recycle` method.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Avoid recursion stack overflow during releasing SingleInputGate
> ---------------------------------------------------------------
>
> Key: FLINK-10367
> URL: https://issues.apache.org/jira/browse/FLINK-10367
> Project: Flink
> Issue Type: Sub-task
> Components: Network
> Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0
> Reporter: zhijiang
> Assignee: zhijiang
> Priority: Minor
> Labels: pull-request-available
>
> For task failure or canceling, the {{SingleInputGate#releaseAllResources}}
> will be invoked before task exits.
> In the process of {{SingleInputGate#releaseAllResources}}, we first loop to
> release all the input channels, then destroy the {{BufferPool}}. For
> {{RemoteInputChannel#releaseAllResources}}, it will return floating buffers
> to the {{BufferPool}} {{which assigns this recycled buffer to the other
> listeners(RemoteInputChannel}}).
> It may exist recursive call in this process. If the listener is already
> released before, it will directly recycle this buffer to the {{BufferPool}}
> which takes another listener to notify available buffer. The above process
> may be invoked repeatedly in recursive way.
> If there are many input channels as listeners in the {{BufferPool}}, it will
> cause {{StackOverflow}} error because of recursion. And in our testing job,
> the scale of 10,000 input channels ever caused this error.
> I think of two ways for solving this potential problem:
> # When the input channel is released, it should notify the {{BufferPool}} of
> unregistering this listener, otherwise it is inconsistent between them.
> # {{SingleInputGate}} should destroy the {{BufferPool}} first, then loop to
> release all the internal input channels. To do so, all the listeners in
> {{BufferPool}} will be removed during destroying, and the input channel will
> not have further interactions during
> {{RemoteInputChannel#releaseAllResources}}.
> I prefer the second way to solve this problem, because we do not want to
> expand another interface method for removing buffer listener, further
> currently the internal data structure in {{BufferPool}} can not support
> remove a listener directly.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)