AHeise commented on a change in pull request #13499:
URL: https://github.com/apache/flink/pull/13499#discussion_r496482074
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
##########
@@ -322,23 +350,71 @@ private MemorySegment requestMemorySegment() {
return requestMemorySegment(UNKNOWN_CHANNEL);
}
- @Nullable
- private MemorySegment requestMemorySegmentFromGlobal() {
- assert Thread.holdsLock(availableMemorySegments);
+ private boolean requestMemorySegmentFromGlobal() {
+ if (numberOfRequestedMemorySegments >= currentPoolSize) {
+ return false;
+ }
+
+ MemorySegment segment =
networkBufferPool.requestMemorySegment();
+ if (segment != null) {
+ availableMemorySegments.add(segment);
+ numberOfRequestedMemorySegments++;
+ return true;
+ }
+ return false;
+ }
- if (isDestroyed) {
- throw new IllegalStateException("Buffer pool is
destroyed.");
+ /**
+ * Tries to obtain a buffer from global pool as soon as one pool is
available. Note that multiple
+ * {@link LocalBufferPool}s might wait on the future of the global
pool, hence this method double-check if a new
+ * buffer is really needed at the time it becomes available.
+ */
+ private void eagerlyRequestMemorySegmentFromGlobal() {
+ if (eagerlyRequesting) {
+ return;
}
+ eagerlyRequesting = true;
+ networkBufferPool.getAvailableFuture().thenRun(() -> {
+ eagerlyRequesting = false;
+ if (availabilityHelper.isAvailable()) {
+ // there is currently no benefit for this pool
to obtain buffer from global; give other pools precedent
+ return;
+ }
+ CompletableFuture<?> toNotify = null;
+ synchronized (availableMemorySegments) {
+ if (numberOfRequestedMemorySegments >=
currentPoolSize) {
+ return;
+ }
+
+ // fetch a segment from global pool
+ if (requestMemorySegmentFromGlobal()) {
+ toNotify =
availabilityHelper.getUnavailableToResetAvailable();
+ } else {
+ // segment probably taken by other
pool, so retry later
+ eagerlyRequestMemorySegmentFromGlobal();
+ }
+ }
+ mayNotifyAvailable(toNotify);
+ });
+ }
+ private boolean checkAvailability() {
+ if (!availableMemorySegments.isEmpty()) {
+ return unavailableSubpartitionsCount == 0;
+ }
if (numberOfRequestedMemorySegments < currentPoolSize) {
- final MemorySegment segment =
networkBufferPool.requestMemorySegment();
- if (segment != null) {
- numberOfRequestedMemorySegments++;
- return segment;
+ if (requestMemorySegmentFromGlobal()) {
+ return unavailableSubpartitionsCount == 0;
Review comment:
Yes.
I initially didn't poll it and it would only affect one more test case,
however, I decided against it. First of all, it's much easier to reason without
having to worry about `unavailableSubpartitionsCount`: If `availableSegments`
are empty and the pool is allowed to take more buffers, it takes one.
It also helps to reach the equilibrium much quicker: every local pool has
the segments it is assigned.
The clear downside is that if two local buffer pools are competing and one
of them reached the quota, the wrong one polls first, keeping both unavailable.
I favored simplicity and quicker equilibrium over this edge case, but can also
revert to the earlier version if you think that the edge case is very common.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]