[
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16147000#comment-16147000
]
ASF GitHub Bot commented on FLINK-7378:
---------------------------------------
Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/4485#discussion_r136018729
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
---
@@ -131,6 +135,59 @@ public void recycle(MemorySegment segment) {
availableMemorySegments.add(segment);
}
+ public List<MemorySegment> requestMemorySegments(int
numRequiredBuffers) throws IOException {
+ checkArgument(numRequiredBuffers > 0, "The number of required
buffers should be larger than 0.");
+
+ synchronized (factoryLock) {
+ if (isDestroyed) {
+ throw new IllegalStateException("Network buffer
pool has already been destroyed.");
+ }
+
+ if (numTotalRequiredBuffers + numRequiredBuffers >
totalNumberOfMemorySegments) {
+ throw new
IOException(String.format("Insufficient number of network buffers: " +
+ "required %d,
but only %d available. The total number of network " +
+ "buffers is
currently set to %d of %d bytes each. You can increase this " +
+ "number by
setting the configuration keys '%s', '%s', and '%s'.",
+ numRequiredBuffers,
+ totalNumberOfMemorySegments -
numTotalRequiredBuffers,
+ totalNumberOfMemorySegments,
+ memorySegmentSize,
+
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(),
+
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key(),
+
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key()));
+ }
+
+ this.numTotalRequiredBuffers += numRequiredBuffers;
+
+ final List<MemorySegment> segments = new
ArrayList<>(numRequiredBuffers);
+ for (int i = 0 ; i < numRequiredBuffers ; i++) {
+ segments.add(availableMemorySegments.poll());
+ }
+
+ try {
+ redistributeBuffers();
--- End diff --
There are still some corner cases not handled properly by this
implementation: consider the following unit test (please also add it to
`NetworkBufferPoolTest` or `BufferPoolFactoryTest`):
```
/**
* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the
{@link NetworkBufferPool}
* currently not containing the number of required free segments
(currently occupied by a buffer
* pool).
*/
@Test
public void testRequestMemorySegmentsWithBuffersTaken() throws
IOException, InterruptedException {
final int numBuffers = 10;
NetworkBufferPool networkBufferPool = new
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
final List<Buffer> buffers = new ArrayList<>(numBuffers);
List<MemorySegment> memorySegments = Collections.emptyList();
Thread bufferRecycler = null;
BufferPool lbp1 = null;
try {
lbp1 = networkBufferPool.createBufferPool(numBuffers /
2, numBuffers);
// take all buffers (more than the minimum required)
for (int i = 0; i < numBuffers; ++i) {
Buffer buffer = lbp1.requestBuffer();
buffers.add(buffer);
assertNotNull(buffer);
}
// if requestMemorySegments() blocks, this will make
sure that enough buffers are freed
// eventually for it to continue
bufferRecycler = new Thread(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException ignored) {
}
for (Buffer buffer : buffers) {
buffer.recycle();
}
});
bufferRecycler.start();
// take more buffers than are freely available at the
moment via requestMemorySegments()
memorySegments =
networkBufferPool.requestMemorySegments(numBuffers / 2);
assertThat(memorySegments, not(hasItem(nullValue())));
} finally {
if (bufferRecycler != null) {
bufferRecycler.join();
}
if (lbp1 != null) {
lbp1.lazyDestroy();
}
networkBufferPool.recycleMemorySegments(memorySegments);
}
}
```
Either all code using this method or the resulting list handles `null`
elements in the returned list (similar to `requestMemorySegment()` used by
`LocalBufferPool#requestBuffer()`) or you must block until you can deliver all
requested elements. I prefer the second option (a method Javadoc should be
added to describe the implemented behaviour) in which case, however, you will
have to call `redistributeBuffers()` before acquiring the segments and you also
have to be careful with the `factoryLock` - I suppose the following may be
correct:
```
public List<MemorySegment> requestMemorySegments(int
numRequiredBuffers) throws IOException {
checkArgument(numRequiredBuffers > 0, "The number of required
buffers should be larger than 0.");
synchronized (factoryLock) {
// ...
this.numTotalRequiredBuffers += numRequiredBuffers;
redistributeBuffers();
}
final List<MemorySegment> segments = new
ArrayList<>(numRequiredBuffers);
for (int i = 0 ; i < numRequiredBuffers ; i++) {
try {
segments.add(availableMemorySegments.take());
} catch (InterruptedException e) {
recycleMemorySegments(segments);
ExceptionUtils.rethrowIOException(e);
}
}
return segments;
}
```
> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -----------------------------------------------------------------------------
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
> Issue Type: Sub-task
> Components: Core
> Reporter: zhijiang
> Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for
> {{SingleInputGate}} is limited by {{a * <number of channels> + b}}, where a
> is the number of exclusive buffers for each channel and b is the number of
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}.
> And the exclusive buffers are assigned to {{InputChannel}}s directly.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)