This is an automated email from the ASF dual-hosted git repository.
bschuchardt pushed a commit to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.12 by this push:
new caf1c47 GEODE-8506: BufferPool returns byte buffers that may be much
larger t… (#5525)
caf1c47 is described below
commit caf1c4783478c2b5c5e5c239874092346316f4fe
Author: Bruce Schuchardt <[email protected]>
AuthorDate: Fri Sep 18 14:21:55 2020 -0700
GEODE-8506: BufferPool returns byte buffers that may be much larger t…
(#5525)
* GEODE-8506: BufferPool returns byte buffers that may be much larger than
requested
Create a "slice" of the acquired buffer to return to the caller instead
of returning a buffer larger than what was requested. On return we fish
out the parent buffer and put it back in the pool.
* cache reflection method, remove magic numbers in test and BufferPool
(cherry picked from commit a11b9c076a72609ff00802c010b6e32262228776)
---
.../distributed/internal/DistributionConfig.java | 2 +-
.../org/apache/geode/internal/net/BufferPool.java | 75 ++++++++++++++++++----
.../apache/geode/internal/net/BufferPoolTest.java | 53 +++++++++++++++
.../geode/internal/net/NioPlainEngineTest.java | 1 -
4 files changed, 115 insertions(+), 16 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionConfig.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionConfig.java
index 89a19a6..0074881 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionConfig.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionConfig.java
@@ -1386,7 +1386,7 @@ public interface DistributionConfig extends Config,
LogConfig, StatisticsConfig
/**
* The default value of the {@link
ConfigurationProperties#SOCKET_BUFFER_SIZE} property
*/
- int DEFAULT_SOCKET_BUFFER_SIZE = 32768;
+ static int DEFAULT_SOCKET_BUFFER_SIZE = 32768;
/**
* The minimum {@link ConfigurationProperties#SOCKET_BUFFER_SIZE}.
* <p>
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
b/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
index 0997c6e..e119250 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
@@ -15,16 +15,21 @@
package org.apache.geode.internal.net;
import java.lang.ref.SoftReference;
+import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.IdentityHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.geode.InternalGemFireException;
+import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.distributed.internal.DMStats;
import org.apache.geode.internal.Assert;
public class BufferPool {
private final DMStats stats;
+ private Method parentOfSliceMethod;
+
/**
* Buffers may be acquired from the Buffers pool
* or they may be allocated using Buffer.allocate(). This enum is used
@@ -85,21 +90,27 @@ public class BufferPool {
stats.incReceiverBufferSize(-refSize, true);
}
}
- } else if (bb.capacity() >= size) {
- bb.rewind();
- bb.limit(size);
- return bb;
} else {
- // wasn't big enough so put it back in the queue
- Assert.assertTrue(bufferQueue.offer(ref));
- if (alreadySeen == null) {
- alreadySeen = new IdentityHashMap<>();
- }
- if (alreadySeen.put(ref, ref) != null) {
- // if it returns non-null then we have already seen this item
- // so we have worked all the way through the queue once.
- // So it is time to give up and allocate a new buffer.
- break;
+ int capacity = bb.capacity();
+ if (capacity > size) {
+ bb.position(0).limit(size);
+ return bb.slice();
+ } else if (capacity == size) {
+ bb.rewind();
+ bb.limit(size);
+ return bb;
+ } else {
+ // wasn't big enough so put it back in the queue
+ Assert.assertTrue(bufferQueue.offer(ref));
+ if (alreadySeen == null) {
+ alreadySeen = new IdentityHashMap<>();
+ }
+ if (alreadySeen.put(ref, ref) != null) {
+ // if it returns non-null then we have already seen this item
+ // so we have worked all the way through the queue once.
+ // So it is time to give up and allocate a new buffer.
+ break;
+ }
}
}
ref = bufferQueue.poll();
@@ -227,6 +238,7 @@ public class BufferPool {
*/
private void releaseBuffer(ByteBuffer bb, boolean send) {
if (bb.isDirect()) {
+ bb = getPoolableBuffer(bb);
BBSoftReference bbRef = new BBSoftReference(bb, send);
bufferQueue.offer(bbRef);
} else {
@@ -239,6 +251,41 @@ public class BufferPool {
}
/**
+ * If we hand out a buffer that is larger than the requested size we create a
+ * "slice" of the buffer having the requested capacity and hand that out
instead.
+ * When we put the buffer back in the pool we need to find the original,
non-sliced,
+ * buffer. This is held in DirectBuffer in its "attachment" field, which is
a public
+ * method, though DirectBuffer is package-private.
+ */
+ @VisibleForTesting
+ public ByteBuffer getPoolableBuffer(ByteBuffer buffer) {
+ ByteBuffer result = buffer;
+ if (parentOfSliceMethod == null) {
+ Class clazz = buffer.getClass();
+ try {
+ Method method = clazz.getMethod("attachment");
+ method.setAccessible(true);
+ parentOfSliceMethod = method;
+ } catch (Exception e) {
+ throw new InternalGemFireException("unable to retrieve underlying byte
buffer", e);
+ }
+ }
+ try {
+ Object attachment = parentOfSliceMethod.invoke(buffer);
+ if (attachment instanceof ByteBuffer) {
+ result = (ByteBuffer) attachment;
+ } else if (attachment != null) {
+ throw new InternalGemFireException(
+ "direct byte buffer attachment was not a byte buffer but a " +
+ attachment.getClass().getName());
+ }
+ } catch (Exception e) {
+ throw new InternalGemFireException("unable to retrieve underlying byte
buffer", e);
+ }
+ return result;
+ }
+
+ /**
* A soft reference that remembers the size of the byte buffer it refers to.
TODO Dan - I really
* think this should be a weak reference. The JVM doesn't seem to clear soft
references if it is
* getting low on direct memory.
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/net/BufferPoolTest.java
b/geode-core/src/test/java/org/apache/geode/internal/net/BufferPoolTest.java
index cc441e4..dec5553 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/BufferPoolTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/BufferPoolTest.java
@@ -113,4 +113,57 @@ public class BufferPoolTest {
assertThat(newBuffer.position()).isEqualTo(16384);
assertThat(newBuffer.limit()).isEqualTo(newBuffer.capacity());
}
+
+ @Test
+ public void checkBufferSizeAfterAllocation() throws Exception {
+ ByteBuffer buffer = bufferPool.acquireDirectReceiveBuffer(100);
+
+ ByteBuffer newBuffer =
+ bufferPool.acquireDirectReceiveBuffer(10000);
+ assertThat(buffer.isDirect()).isTrue();
+ assertThat(newBuffer.isDirect()).isTrue();
+ assertThat(buffer.capacity()).isEqualTo(100);
+ assertThat(newBuffer.capacity()).isEqualTo(10000);
+
+ // buffer should be ready to read the same amount of data
+ assertThat(buffer.position()).isEqualTo(0);
+ assertThat(buffer.limit()).isEqualTo(100);
+ assertThat(newBuffer.position()).isEqualTo(0);
+ assertThat(newBuffer.limit()).isEqualTo(10000);
+ }
+
+ @Test
+ public void checkBufferSizeAfterAcquire() throws Exception {
+ ByteBuffer buffer = bufferPool.acquireDirectReceiveBuffer(100);
+
+ ByteBuffer newBuffer =
+ bufferPool.acquireDirectReceiveBuffer(10000);
+ assertThat(buffer.capacity()).isEqualTo(100);
+ assertThat(newBuffer.capacity()).isEqualTo(10000);
+ assertThat(buffer.isDirect()).isTrue();
+ assertThat(newBuffer.isDirect()).isTrue();
+
+ assertThat(buffer.position()).isEqualTo(0);
+ assertThat(buffer.limit()).isEqualTo(100);
+ assertThat(newBuffer.position()).isEqualTo(0);
+ assertThat(newBuffer.limit()).isEqualTo(10000);
+
+ bufferPool.releaseReceiveBuffer(buffer);
+ bufferPool.releaseReceiveBuffer(newBuffer);
+
+ buffer = bufferPool.acquireDirectReceiveBuffer(1000);
+ newBuffer =
+ bufferPool.acquireDirectReceiveBuffer(15000);
+
+ assertThat(buffer.capacity()).isEqualTo(1000);
+ assertThat(newBuffer.capacity()).isEqualTo(15000);
+ assertThat(buffer.isDirect()).isTrue();
+ assertThat(newBuffer.isDirect()).isTrue();
+
+ assertThat(buffer.position()).isEqualTo(0);
+ assertThat(buffer.limit()).isEqualTo(1000);
+ assertThat(newBuffer.position()).isEqualTo(0);
+ assertThat(newBuffer.limit()).isEqualTo(15000);
+ }
+
}
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java
b/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java
index e9785de..166c2a5 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java
@@ -57,7 +57,6 @@ public class NioPlainEngineTest {
@Test
public void ensureWrappedCapacity() {
ByteBuffer wrappedBuffer = bufferPool.acquireDirectReceiveBuffer(100);
- verify(mockStats, times(1)).incReceiverBufferSize(any(Integer.class),
any(Boolean.class));
wrappedBuffer.put(new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
nioEngine.lastReadPosition = 10;
int requestedCapacity = 210;