[
https://issues.apache.org/jira/browse/FLINK-9755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16540299#comment-16540299
]
ASF GitHub Bot commented on FLINK-9755:
---------------------------------------
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6272#discussion_r201744892
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
---
@@ -277,37 +277,17 @@ public void recycle(MemorySegment segment) {
// We do not know which locks have been acquired before the
recycle() or are needed in the
// notification and which other threads also access them.
// -> call notifyBufferAvailable() outside of the synchronized
block to avoid a deadlock (FLINK-9676)
- boolean success = false;
- boolean needMoreBuffers = false;
- try {
- needMoreBuffers = listener.notifyBufferAvailable(new
NetworkBuffer(segment, this));
- success = true;
- } catch (Throwable ignored) {
- // handled below, under the lock
- }
+ // Note that in case of any exceptions notifyBufferAvailable()
should recycle the buffer and
+ // therefore end up in this method again.
+ needMoreBuffers = listener.notifyBufferAvailable(new
NetworkBuffer(segment, this));
- if (!success || needMoreBuffers) {
+ if (needMoreBuffers) {
synchronized (availableMemorySegments) {
if (isDestroyed) {
// cleanup tasks how they would have
been done if we only had one synchronized block
- if (needMoreBuffers) {
-
listener.notifyBufferDestroyed();
- }
- if (!success) {
- returnMemorySegment(segment);
- }
+ listener.notifyBufferDestroyed();
} else {
- if (needMoreBuffers) {
-
registeredListeners.add(listener);
- }
- if (!success) {
- if
(numberOfRequestedMemorySegments > currentPoolSize) {
-
returnMemorySegment(segment);
- } else {
-
availableMemorySegments.add(segment);
-
availableMemorySegments.notify();
- }
- }
+ registeredListeners.add(listener);
--- End diff --
If there is an exception being thrown, this will not be called. Is that how
it supposed to be?
> Exceptions in RemoteInputChannel#notifyBufferAvailable() are not propagated
> to the responsible thread
> -----------------------------------------------------------------------------------------------------
>
> Key: FLINK-9755
> URL: https://issues.apache.org/jira/browse/FLINK-9755
> Project: Flink
> Issue Type: Bug
> Components: Network
> Affects Versions: 1.5.0
> Reporter: Nico Kruber
> Assignee: Nico Kruber
> Priority: Critical
> Labels: pull-request-available
> Fix For: 1.5.2, 1.6.0
>
>
> The credit-based flow control implementation of
> RemoteInputChannel#notifyBufferAvailable() does not forward errors (like the
> {{IllegalStateException}}) to the thread that is being notified. The calling
> code at {{LocalBufferPool#recycle}}, however, relies on the callback
> forwarding errors and completely ignores any failures.
> Therefore, we could end up with a program waiting forever for the callback
> and not even a failure message in the logs.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)