[
https://issues.apache.org/jira/browse/FLINK-10367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16643673#comment-16643673
]
Nico Kruber commented on FLINK-10367:
-------------------------------------
As a rough sketch for option 3 (maybe this can be optimised):
{code}
public enum NotificationResult {
NONE(false, false),
BUFFER_USED_FINISHED(true, false),
BUFFER_USED_NEED_MORE(true, true);
private final boolean bufferUsed;
private final boolean needsMoreBuffers;
NotificationResult(boolean bufferUsed, boolean
needsMoreBuffers) {
this.bufferUsed = bufferUsed;
this.needsMoreBuffers = needsMoreBuffers;
}
public boolean bufferUsed() {
return bufferUsed;
}
public boolean needsMoreBuffers() {
return needsMoreBuffers;
}
}
@Override
public void recycle(MemorySegment segment) {
BufferListener listener;
NetworkBuffer buffer;
NotificationResult notificationResult =
NotificationResult.NONE; // some enum
while (notificationResult.bufferUsed()) {
synchronized (availableMemorySegments) {
if (isDestroyed ||
numberOfRequestedMemorySegments > currentPoolSize) {
returnMemorySegment(segment);
return;
} else {
listener = registeredListeners.poll();
if (listener == null) {
availableMemorySegments.add(segment);
availableMemorySegments.notify();
return;
}
}
}
// We do not know which locks have been acquired before
the recycle() or are needed in the
// notification and which other threads also access
them.
// -> call notifyBufferAvailable() outside of the
synchronized block to avoid a deadlock (FLINK-9676)
// Note that in case of any exceptions
notifyBufferAvailable() should recycle the buffer
// (either directly or later during error handling) and
therefore eventually end up in this
// method again.
if (buffer == null) {
buffer = new NetworkBuffer(segment, this);
}
notificationResult =
listener.notifyBufferAvailable(buffer);
if (notificationResult.needsMoreBuffers()) {
synchronized (availableMemorySegments) {
if (isDestroyed) {
// cleanup tasks how they would
have been done if we only had one synchronized block
listener.notifyBufferDestroyed();
} else {
registeredListeners.add(listener);
}
}
}
}
}
{code}
Not really nice, but maybe better than adding code that ignores exceptions in
particular cases...
> Avoid recursion stack overflow during releasing SingleInputGate
> ---------------------------------------------------------------
>
> Key: FLINK-10367
> URL: https://issues.apache.org/jira/browse/FLINK-10367
> Project: Flink
> Issue Type: Improvement
> Components: Network
> Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0
> Reporter: zhijiang
> Assignee: zhijiang
> Priority: Minor
>
> For task failure or canceling, the {{SingleInputGate#releaseAllResources}}
> will be invoked before task exits.
> In the process of {{SingleInputGate#releaseAllResources}}, we first loop to
> release all the input channels, then destroy the {{BufferPool}}. For
> {{RemoteInputChannel#releaseAllResources}}, it will return floating buffers
> to the {{BufferPool}} {{which assigns this recycled buffer to the other
> listeners(RemoteInputChannel}}).
> It may exist recursive call in this process. If the listener is already
> released before, it will directly recycle this buffer to the {{BufferPool}}
> which takes another listener to notify available buffer. The above process
> may be invoked repeatedly in recursive way.
> If there are many input channels as listeners in the {{BufferPool}}, it will
> cause {{StackOverflow}} error because of recursion. And in our testing job,
> the scale of 10,000 input channels ever caused this error.
> I think of two ways for solving this potential problem:
> # When the input channel is released, it should notify the {{BufferPool}} of
> unregistering this listener, otherwise it is inconsistent between them.
> # {{SingleInputGate}} should destroy the {{BufferPool}} first, then loop to
> release all the internal input channels. To do so, all the listeners in
> {{BufferPool}} will be removed during destroying, and the input channel will
> not have further interactions during
> {{RemoteInputChannel#releaseAllResources}}.
> I prefer the second way to solve this problem, because we do not want to
> expand another interface method for removing buffer listener, further
> currently the internal data structure in {{BufferPool}} can not support
> remove a listener directly.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)