Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/4758#discussion_r148402375
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
---
@@ -53,9 +64,89 @@ public void verifyAllBuffersReturned() {
networkBufferPool.destroy();
}
- @Test(expected = IOException.class)
- public void testRequireMoreThanPossible() throws IOException {
-
networkBufferPool.createBufferPool(networkBufferPool.getTotalNumberOfMemorySegments()
* 2, Integer.MAX_VALUE);
+ /**
+ * Tests creating one buffer pool which requires more buffers than
available.
+ */
+ @Test
+ public void testRequireMoreThanPossible1() throws IOException {
+ expectedException.expect(IOException.class);
+ expectedException.expectMessage("Insufficient number of network
buffers");
+
+
networkBufferPool.createBufferPool(networkBufferPool.getTotalNumberOfMemorySegments()
+ 1,
+ Integer.MAX_VALUE);
+ }
+
+ /**
+ * Tests creating two buffer pools which together require more buffers
than available.
+ */
+ @Test
+ public void testRequireMoreThanPossible2() throws IOException {
+ expectedException.expect(IOException.class);
+ expectedException.expectMessage("Insufficient number of network
buffers");
+
+ networkBufferPool.createBufferPool(numBuffers / 2 + 1,
numBuffers);
+ networkBufferPool.createBufferPool(numBuffers / 2 + 1,
numBuffers);
+ }
+
+ /**
+ * Tests creating two buffer pools which together require as many
buffers as available but where
+ * there are less buffers available to the {@link NetworkBufferPool} at
the time of the second
+ * {@link LocalBufferPool} creation.
+ */
+ @Test
+ public void testOverprovisioned() throws IOException {
+ int buffersToTakeFromPool1 = numBuffers / 2 + 1;
+ int buffersToTakeFromPool2 = numBuffers -
buffersToTakeFromPool1;
+
+ List<Buffer> buffers = new ArrayList<>(numBuffers);
+ BufferPool lbp1 = null, lbp2 = null;
+ try {
+ lbp1 =
networkBufferPool.createBufferPool(buffersToTakeFromPool2, numBuffers);
+
+ // take more buffers than the minimum required
+ for (int i = 0; i < buffersToTakeFromPool1; ++i) {
+ Buffer buffer = lbp1.requestBuffer();
+ buffers.add(buffer);
+ assertNotNull(buffer);
+ }
+ assertEquals(buffersToTakeFromPool1,
lbp1.bestEffortGetNumOfUsedBuffers());
+ assertEquals(numBuffers, lbp1.getNumBuffers());
+
+ // create a second pool which requires more than are
freely available at the moment
+ lbp2 =
networkBufferPool.createBufferPool(buffersToTakeFromPool1, numBuffers);
+
+ assertEquals(lbp2.getNumberOfRequiredMemorySegments(),
lbp2.getNumBuffers());
+ assertEquals(lbp1.getNumberOfRequiredMemorySegments(),
lbp1.getNumBuffers());
+ assertNull(lbp1.requestBuffer());
+
+ // take all remaining buffers
+ for (int i = 0; i < buffersToTakeFromPool2; ++i) {
+ Buffer buffer = lbp2.requestBuffer();
+ buffers.add(buffer);
+ assertNotNull(buffer);
+ }
+ assertEquals(buffersToTakeFromPool2,
lbp2.bestEffortGetNumOfUsedBuffers());
+
+ // we should be able to get one more but this is
currently given out to lbp1 and taken by buffer1
+ assertNull(lbp2.requestBuffer());
+
+ // as soon as the excess buffer of lbp1 is recycled, it
should be available for lbp2
+ buffers.remove(buffers.size() - 1).recycle();
+
+ Buffer buffer = lbp2.requestBuffer();
+ buffers.add(buffer);
+ assertNotNull(buffer);
+ } finally {
+ for (Buffer buffer : buffers) {
+ buffer.recycle();
+ }
+ if (lbp1 != null) {
--- End diff --
actually, we don't want to extend further/add dependencies on guava, afaik
---