Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/4499#discussion_r140829015
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
---
@@ -215,6 +269,48 @@ public String toString() {
}
//
------------------------------------------------------------------------
+ // Credit-based
+ //
------------------------------------------------------------------------
+
+ /**
+ * Enqueue this input channel in the pipeline for sending unannounced
credits to producer.
+ */
+ void notifyCreditAvailable() {
+ //TODO in next PR
+ }
+
+ /**
+ * Exclusive buffer is recycled to this input channel directly and it
may trigger notify
+ * credit to producer.
+ *
+ * @param segment The exclusive segment of this channel.
+ */
+ @Override
+ public void recycle(MemorySegment segment) {
+ synchronized (availableBuffers) {
+ if (isReleased.get()) {
+ try {
+
inputGate.returnExclusiveSegments(Arrays.asList(segment));
+ return;
+ } catch (Throwable t) {
+ ExceptionUtils.rethrow(t);
+ }
+ }
--- End diff --
Can you maybe add a comment on the importance of the `isReleased` check
being inside the synchronized block (as implemented by `onBuffer` before, but
also without a comment)? This is related to the `AtomicBoolean` field and not
getting into races here.
---