zhijiangW commented on a change in pull request #11687:
URL: https://github.com/apache/flink/pull/11687#discussion_r419467829
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -321,111 +274,43 @@ private void notifyCreditAvailable() {
partitionRequestClient.notifyCreditAvailable(this);
}
- /**
- * Exclusive buffer is recycled to this input channel directly and it
may trigger return extra
- * floating buffer and notify increased credit to the producer.
- *
- * @param segment The exclusive segment of this channel.
- */
- @Override
- public void recycle(MemorySegment segment) {
- int numAddedBuffers;
-
- synchronized (bufferQueue) {
- // Similar to notifyBufferAvailable(), make sure that
we never add a buffer
- // after releaseAllResources() released all buffers
(see below for details).
- if (isReleased.get()) {
- try {
-
memorySegmentProvider.recycleMemorySegments(Collections.singletonList(segment));
- return;
- } catch (Throwable t) {
- ExceptionUtils.rethrow(t);
- }
- }
- numAddedBuffers = bufferQueue.addExclusiveBuffer(new
NetworkBuffer(segment, this), numRequiredBuffers);
- }
-
- if (numAddedBuffers > 0 &&
unannouncedCredit.getAndAdd(numAddedBuffers) == 0) {
- notifyCreditAvailable();
- }
- }
-
public int getNumberOfAvailableBuffers() {
- synchronized (bufferQueue) {
- return bufferQueue.getAvailableBufferSize();
- }
+ return bufferManager.getNumberOfAvailableBuffers();
}
public int getNumberOfRequiredBuffers() {
- return numRequiredBuffers;
+ return bufferManager.getNumberOfRequiredBuffers();
}
Review comment:
Yes, it could be as you said. Actually it should be a separate hotfix if
we want to fix it in this PR.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]