xintongsong commented on code in PR #20924:
URL: https://github.com/apache/flink/pull/20924#discussion_r1043241608
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -130,6 +130,10 @@ class LocalBufferPool implements BufferPool {
@GuardedBy("availableMemorySegments")
private final AvailabilityHelper availabilityHelper = new
AvailabilityHelper();
+ /**
+ * Indicates this {@link LocalBufferPool} will request buffer from global
pool when it becomes
+ * available.
+ */
Review Comment:
```suggestion
/**
* Indicates whether this {@link LocalBufferPool} has requested to be
notified on the next time that global pool becoming available, so it can then
request buffer from the global pool.
*/
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -757,4 +781,47 @@ public void recycle(MemorySegment memorySegment) {
bufferPool.recycle(memorySegment, channel);
}
}
+
+ /**
+ * This class represents the buffer pool's current ground-truth
availability and whether to
+ * request buffer from global pool when it is available.
+ */
+ private enum AvailabilityStatus {
+ AVAILABLE(true, false),
+ UNAVAILABLE_NEED_REQUEST_FROM_GLOBAL(false, true),
+ UNAVAILABLE_NEED_NOT_REQUEST_FROM_GLOBAL(false, false);
+
+ /** Indicates whether the {@link LocalBufferPool} is currently
available. */
+ private final boolean available;
+
+ /** Indicates whether to request buffer from globalPool when it is
available. */
+ private final boolean needRequestFromGlobalWhenAvailable;
Review Comment:
See comment for `requestingWhenAvailable`.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -652,13 +674,15 @@ public void setNumBuffers(int numBuffers) {
// one buffer from NetworkBufferPool
return;
}
-
- if (checkAvailability()) {
+ AvailabilityStatus availabilityAndRequestFromGlobalPool =
checkAvailability();
+ if (availabilityAndRequestFromGlobalPool.isAvailable()) {
toNotify = availabilityHelper.getUnavailableToResetAvailable();
} else {
availabilityHelper.resetUnavailable();
}
-
+ if
(availabilityAndRequestFromGlobalPool.isNeedRequestFromGlobalWhenAvailable()) {
+ requestMemorySegmentFromGlobalWhenAvailable();
+ }
checkConsistentAvailability();
Review Comment:
This common pattern has repeated 4 times. It would be nice to deduplicate it.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -757,4 +781,47 @@ public void recycle(MemorySegment memorySegment) {
bufferPool.recycle(memorySegment, channel);
}
}
+
+ /**
+ * This class represents the buffer pool's current ground-truth
availability and whether to
+ * request buffer from global pool when it is available.
+ */
+ private enum AvailabilityStatus {
+ AVAILABLE(true, false),
+ UNAVAILABLE_NEED_REQUEST_FROM_GLOBAL(false, true),
+ UNAVAILABLE_NEED_NOT_REQUEST_FROM_GLOBAL(false, false);
Review Comment:
See comment for `requestingWhenAvailable`.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -130,6 +130,10 @@ class LocalBufferPool implements BufferPool {
@GuardedBy("availableMemorySegments")
private final AvailabilityHelper availabilityHelper = new
AvailabilityHelper();
+ /**
+ * Indicates this {@link LocalBufferPool} will request buffer from global
pool when it becomes
+ * available.
+ */
Review Comment:
I'd suggest to also rename `requestingWhenAvailable` to
`requestingNotificationOfGlobalPoolAvailable`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]