Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/6272#discussion_r201971896
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
---
@@ -360,32 +360,44 @@ public boolean notifyBufferAvailable(Buffer buffer) {
return false;
}
- boolean needMoreBuffers = false;
- synchronized (bufferQueue) {
- checkState(isWaitingForFloatingBuffers, "This channel
should be waiting for floating buffers.");
+ boolean recycleBuffer = true;
+ try {
+ boolean needMoreBuffers = false;
+ synchronized (bufferQueue) {
+ checkState(isWaitingForFloatingBuffers,
+ "This channel should be waiting for
floating buffers.");
+
+ // Important: double check the isReleased state
inside synchronized block, so there is no
+ // race condition when notifyBufferAvailable
and releaseAllResources running in parallel.
+ if (isReleased.get() ||
+ bufferQueue.getAvailableBufferSize() >=
numRequiredBuffers) {
+ isWaitingForFloatingBuffers = false;
+ buffer.recycleBuffer();
+ return false;
+ }
- // Important: double check the isReleased state inside
synchronized block, so there is no
- // race condition when notifyBufferAvailable and
releaseAllResources running in parallel.
- if (isReleased.get() ||
bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) {
- isWaitingForFloatingBuffers = false;
- buffer.recycleBuffer();
- return false;
- }
+ recycleBuffer = false;
+ bufferQueue.addFloatingBuffer(buffer);
- bufferQueue.addFloatingBuffer(buffer);
+ if (bufferQueue.getAvailableBufferSize() ==
numRequiredBuffers) {
+ isWaitingForFloatingBuffers = false;
+ } else {
+ needMoreBuffers = true;
+ }
- if (bufferQueue.getAvailableBufferSize() ==
numRequiredBuffers) {
- isWaitingForFloatingBuffers = false;
- } else {
- needMoreBuffers = true;
+ if (unannouncedCredit.getAndAdd(1) == 0) {
+ notifyCreditAvailable();
+ }
}
- }
- if (unannouncedCredit.getAndAdd(1) == 0) {
- notifyCreditAvailable();
+ return needMoreBuffers;
+ } catch (Throwable t) {
+ if (recycleBuffer) {
+ buffer.recycleBuffer();
+ }
+ setError(t);
+ return false;
--- End diff --
The protection (currently) is against the `checkState` here and in
`notifyCreditAvailable()` (the latter also has some external calls).
We could return `needMoreBuffers` here but I actually see this exception as
if it happened in the responsible thread and therefore not continue to use this
listener.
---