Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/4509#discussion_r141901569
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
---
@@ -209,6 +276,95 @@ public String toString() {
}
//
------------------------------------------------------------------------
+ // Credit-based
+ //
------------------------------------------------------------------------
+
+ /**
+ * Enqueue this input channel in the pipeline for sending unannounced
credits to producer.
+ */
+ void notifyCreditAvailable() {
+ //TODO in next PR
+ }
+
+ /**
+ * Exclusive buffer is recycled to this input channel directly and it
may trigger notify
+ * credit to producer.
+ *
+ * @param segment The exclusive segment of this channel.
+ */
+ @Override
+ public void recycle(MemorySegment segment) {
+ synchronized (availableBuffers) {
+ // Important: the isReleased check should be inside the
synchronized block.
+ // that way the segment can also be returned to global
pool after added into
+ // the available queue during releasing all resources.
+ if (isReleased.get()) {
+ try {
+
inputGate.returnExclusiveSegments(Arrays.asList(segment));
+ return;
+ } catch (Throwable t) {
+ ExceptionUtils.rethrow(t);
+ }
+ }
+ availableBuffers.add(new Buffer(segment, this));
+ }
+
+ if (unannouncedCredit.getAndAdd(1) == 0) {
+ notifyCreditAvailable();
+ }
+ }
+
+ public int getNumberOfAvailableBuffers() {
+ synchronized (availableBuffers) {
+ return availableBuffers.size();
+ }
+ }
+
+ /**
+ * The Buffer pool notifies this channel of an available floating
buffer. If the channel is released or
+ * currently does not need extra buffers, the buffer should be recycled
to the buffer pool. Otherwise,
+ * the buffer will be added into the <tt>availableBuffers</tt> queue
and the unannounced credit is
+ * increased by one.
+ *
+ * @param buffer Buffer that becomes available in buffer pool.
+ * @return True when this channel is waiting for more floating buffers,
otherwise false.
+ */
+ @Override
+ public boolean notifyBufferAvailable(Buffer buffer) {
+ checkState(isWaitingForFloatingBuffers.get(), "This channel
should be waiting for floating buffers.");
+
+ synchronized (availableBuffers) {
+ // Important: the isReleased check should be inside the
synchronized block.
+ if (isReleased.get() || availableBuffers.size() >=
senderBacklog.get()) {
+ isWaitingForFloatingBuffers.set(false);
+ buffer.recycle();
+
+ return false;
+ }
+
+ availableBuffers.add(buffer);
+
+ if (unannouncedCredit.getAndAdd(1) == 0) {
+ notifyCreditAvailable();
+ }
--- End diff --
can we do this outside the `synchronized` block?
---