Github user zhijiangW commented on a diff in the pull request:
https://github.com/apache/flink/pull/6272#discussion_r201242662
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
---
@@ -360,32 +360,45 @@ public boolean notifyBufferAvailable(Buffer buffer) {
return false;
}
- boolean needMoreBuffers = false;
- synchronized (bufferQueue) {
- checkState(isWaitingForFloatingBuffers, "This channel
should be waiting for floating buffers.");
+ boolean recycleBuffer = true;
+ try {
+ boolean needMoreBuffers = false;
+ synchronized (bufferQueue) {
+ checkState(isWaitingForFloatingBuffers,
+ "This channel should be waiting for
floating buffers.");
+
+ // Important: double check the isReleased state
inside synchronized block, so there is no
+ // race condition when notifyBufferAvailable
and releaseAllResources running in parallel.
+ if (isReleased.get() ||
+ bufferQueue.getAvailableBufferSize() >=
numRequiredBuffers) {
+ isWaitingForFloatingBuffers = false;
+ buffer.recycleBuffer();
+ return false;
+ }
- // Important: double check the isReleased state inside
synchronized block, so there is no
- // race condition when notifyBufferAvailable and
releaseAllResources running in parallel.
- if (isReleased.get() ||
bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) {
- isWaitingForFloatingBuffers = false;
- buffer.recycleBuffer();
- return false;
- }
+ // note: this call may fail, for better
cleanup, increase the counter first
+ if (unannouncedCredit.getAndAdd(1) == 0) {
+ notifyCreditAvailable();
--- End diff --
From failure point, `notifyCreditAvailable` should be called before
`bufferQueue.addFloatingBuffer`. But from another point, it seems more strict
in logic to confirm buffer ready before announcing credit, otherwise the
channel may receive new data before queuing this floating buffer, although it
can hardly happen based on current implementation.
Another concern is that `notifyCreditAvailable` itself is thread safe. But
considering handling failure easily, we place it under `synchronized` part.
Currently the process of `notifyCreditAvailable` is very lightweight, so the
cost can be ignored.
Maybe the above two concerns are unnecessary, and I can accept the current
modifications.
---