Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/4485#discussion_r136018729
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
---
@@ -131,6 +135,59 @@ public void recycle(MemorySegment segment) {
availableMemorySegments.add(segment);
}
+ public List<MemorySegment> requestMemorySegments(int
numRequiredBuffers) throws IOException {
+ checkArgument(numRequiredBuffers > 0, "The number of required
buffers should be larger than 0.");
+
+ synchronized (factoryLock) {
+ if (isDestroyed) {
+ throw new IllegalStateException("Network buffer
pool has already been destroyed.");
+ }
+
+ if (numTotalRequiredBuffers + numRequiredBuffers >
totalNumberOfMemorySegments) {
+ throw new
IOException(String.format("Insufficient number of network buffers: " +
+ "required %d,
but only %d available. The total number of network " +
+ "buffers is
currently set to %d of %d bytes each. You can increase this " +
+ "number by
setting the configuration keys '%s', '%s', and '%s'.",
+ numRequiredBuffers,
+ totalNumberOfMemorySegments -
numTotalRequiredBuffers,
+ totalNumberOfMemorySegments,
+ memorySegmentSize,
+
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(),
+
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key(),
+
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key()));
+ }
+
+ this.numTotalRequiredBuffers += numRequiredBuffers;
+
+ final List<MemorySegment> segments = new
ArrayList<>(numRequiredBuffers);
+ for (int i = 0 ; i < numRequiredBuffers ; i++) {
+ segments.add(availableMemorySegments.poll());
+ }
+
+ try {
+ redistributeBuffers();
--- End diff --
There are still some corner cases not handled properly by this
implementation: consider the following unit test (please also add it to
`NetworkBufferPoolTest` or `BufferPoolFactoryTest`):
```
/**
* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the
{@link NetworkBufferPool}
* currently not containing the number of required free segments
(currently occupied by a buffer
* pool).
*/
@Test
public void testRequestMemorySegmentsWithBuffersTaken() throws
IOException, InterruptedException {
final int numBuffers = 10;
NetworkBufferPool networkBufferPool = new
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
final List<Buffer> buffers = new ArrayList<>(numBuffers);
List<MemorySegment> memorySegments = Collections.emptyList();
Thread bufferRecycler = null;
BufferPool lbp1 = null;
try {
lbp1 = networkBufferPool.createBufferPool(numBuffers /
2, numBuffers);
// take all buffers (more than the minimum required)
for (int i = 0; i < numBuffers; ++i) {
Buffer buffer = lbp1.requestBuffer();
buffers.add(buffer);
assertNotNull(buffer);
}
// if requestMemorySegments() blocks, this will make
sure that enough buffers are freed
// eventually for it to continue
bufferRecycler = new Thread(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException ignored) {
}
for (Buffer buffer : buffers) {
buffer.recycle();
}
});
bufferRecycler.start();
// take more buffers than are freely available at the
moment via requestMemorySegments()
memorySegments =
networkBufferPool.requestMemorySegments(numBuffers / 2);
assertThat(memorySegments, not(hasItem(nullValue())));
} finally {
if (bufferRecycler != null) {
bufferRecycler.join();
}
if (lbp1 != null) {
lbp1.lazyDestroy();
}
networkBufferPool.recycleMemorySegments(memorySegments);
}
}
```
Either all code using this method or the resulting list handles `null`
elements in the returned list (similar to `requestMemorySegment()` used by
`LocalBufferPool#requestBuffer()`) or you must block until you can deliver all
requested elements. I prefer the second option (a method Javadoc should be
added to describe the implemented behaviour) in which case, however, you will
have to call `redistributeBuffers()` before acquiring the segments and you also
have to be careful with the `factoryLock` - I suppose the following may be
correct:
```
public List<MemorySegment> requestMemorySegments(int
numRequiredBuffers) throws IOException {
checkArgument(numRequiredBuffers > 0, "The number of required
buffers should be larger than 0.");
synchronized (factoryLock) {
// ...
this.numTotalRequiredBuffers += numRequiredBuffers;
redistributeBuffers();
}
final List<MemorySegment> segments = new
ArrayList<>(numRequiredBuffers);
for (int i = 0 ; i < numRequiredBuffers ; i++) {
try {
segments.add(availableMemorySegments.take());
} catch (InterruptedException e) {
recycleMemorySegments(segments);
ExceptionUtils.rethrowIOException(e);
}
}
return segments;
}
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---