This is an automated email from the ASF dual-hosted git repository.
bschuchardt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new a11b9c0 GEODE-8506: BufferPool returns byte buffers that may be much
larger t… (#5525)
a11b9c0 is described below
commit a11b9c076a72609ff00802c010b6e32262228776
Author: Bruce Schuchardt <[email protected]>
AuthorDate: Fri Sep 18 14:21:55 2020 -0700
GEODE-8506: BufferPool returns byte buffers that may be much larger t…
(#5525)
* GEODE-8506: BufferPool returns byte buffers that may be much larger than
requested
Create a "slice" of the acquired buffer to return to the caller instead
of returning a buffer larger than what was requested. On return we fish
out the parent buffer and put it back in the pool.
* cache reflection method, remove magic numbers in test and BufferPool
The one dunit failure seems to be GEODE-7710
---
.../distributed/internal/DistributionConfig.java | 2 +-
.../org/apache/geode/internal/net/BufferPool.java | 74 ++++++++++++++++++----
.../apache/geode/internal/net/BufferPoolTest.java | 28 ++++++--
.../geode/internal/net/NioPlainEngineTest.java | 3 +-
.../apache/geode/internal/tcp/MsgStreamerTest.java | 2 +
5 files changed, 85 insertions(+), 24 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionConfig.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionConfig.java
index 1d69fcb..9aad592 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionConfig.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionConfig.java
@@ -1396,7 +1396,7 @@ public interface DistributionConfig extends Config,
LogConfig, StatisticsConfig
/**
* The default value of the {@link
ConfigurationProperties#SOCKET_BUFFER_SIZE} property
*/
- int DEFAULT_SOCKET_BUFFER_SIZE = 32768;
+ static int DEFAULT_SOCKET_BUFFER_SIZE = 32768;
/**
* The minimum {@link ConfigurationProperties#SOCKET_BUFFER_SIZE}.
* <p>
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
b/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
index 26d069b..20b138e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
@@ -15,12 +15,15 @@
package org.apache.geode.internal.net;
import java.lang.ref.SoftReference;
+import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.IdentityHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.logging.log4j.Logger;
+import org.apache.geode.InternalGemFireException;
+import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.distributed.internal.DMStats;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.internal.Assert;
@@ -32,6 +35,8 @@ public class BufferPool {
private final DMStats stats;
private static final Logger logger = LogService.getLogger();
+ private Method parentOfSliceMethod;
+
/**
* Buffers may be acquired from the Buffers pool
* or they may be allocated using Buffer.allocate(). This enum is used
@@ -65,10 +70,10 @@ public class BufferPool {
private final ConcurrentLinkedQueue<BBSoftReference> bufferLargeQueue =
new ConcurrentLinkedQueue<>();
- private final int SMALL_BUFFER_SIZE = Connection.SMALL_BUFFER_SIZE;
+ static final int SMALL_BUFFER_SIZE = Connection.SMALL_BUFFER_SIZE;
- private final int MEDIUM_BUFFER_SIZE =
DistributionConfig.DEFAULT_SOCKET_BUFFER_SIZE;
+ static final int MEDIUM_BUFFER_SIZE =
DistributionConfig.DEFAULT_SOCKET_BUFFER_SIZE;
/**
@@ -98,14 +103,18 @@ public class BufferPool {
if (useDirectBuffers) {
if (size <= MEDIUM_BUFFER_SIZE) {
- return acquirePredefinedFixedBuffer(send, size);
+ result = acquirePredefinedFixedBuffer(send, size);
} else {
- return acquireLargeBuffer(send, size);
+ result = acquireLargeBuffer(send, size);
}
- } else {
- // if we are using heap buffers then don't bother with keeping them
around
- result = ByteBuffer.allocate(size);
+ if (result.capacity() > size) {
+ result.position(0).limit(size);
+ result = result.slice();
+ }
+ return result;
}
+ // if we are using heap buffers then don't bother with keeping them around
+ result = ByteBuffer.allocate(size);
updateBufferStats(size, send, false);
return result;
}
@@ -123,7 +132,8 @@ public class BufferPool {
}
/**
- * Acquire direct buffer with predefined default capacity (4096 or 32768)
+ * Acquire direct buffer with predefined default capacity (SMALL_BUFFER_SIZE
or
+ * MEDIUM_BUFFER_SIZE)
*/
private ByteBuffer acquirePredefinedFixedBuffer(boolean send, int size) {
// set
@@ -295,22 +305,58 @@ public class BufferPool {
/**
* Releases a previously acquired buffer.
*/
- private void releaseBuffer(ByteBuffer bb, boolean send) {
- if (bb.isDirect()) {
- BBSoftReference bbRef = new BBSoftReference(bb, send);
- if (bb.capacity() <= SMALL_BUFFER_SIZE) {
+ private void releaseBuffer(ByteBuffer buffer, boolean send) {
+ if (buffer.isDirect()) {
+ buffer = getPoolableBuffer(buffer);
+ BBSoftReference bbRef = new BBSoftReference(buffer, send);
+ if (buffer.capacity() <= SMALL_BUFFER_SIZE) {
bufferSmallQueue.offer(bbRef);
- } else if (bb.capacity() <= MEDIUM_BUFFER_SIZE) {
+ } else if (buffer.capacity() <= MEDIUM_BUFFER_SIZE) {
bufferMiddleQueue.offer(bbRef);
} else {
bufferLargeQueue.offer(bbRef);
}
} else {
- updateBufferStats(-bb.capacity(), send, false);
+ updateBufferStats(-buffer.capacity(), send, false);
}
}
/**
+ * If we hand out a buffer that is larger than the requested size we create a
+ * "slice" of the buffer having the requested capacity and hand that out
instead.
+ * When we put the buffer back in the pool we need to find the original,
non-sliced,
+ * buffer. This is held in DirectBuffer in its "attachment" field, which is
a public
+ * method, though DirectBuffer is package-private.
+ */
+ @VisibleForTesting
+ public ByteBuffer getPoolableBuffer(ByteBuffer buffer) {
+ ByteBuffer result = buffer;
+ if (parentOfSliceMethod == null) {
+ Class clazz = buffer.getClass();
+ try {
+ Method method = clazz.getMethod("attachment");
+ method.setAccessible(true);
+ parentOfSliceMethod = method;
+ } catch (Exception e) {
+ throw new InternalGemFireException("unable to retrieve underlying byte
buffer", e);
+ }
+ }
+ try {
+ Object attachment = parentOfSliceMethod.invoke(buffer);
+ if (attachment instanceof ByteBuffer) {
+ result = (ByteBuffer) attachment;
+ } else if (attachment != null) {
+ throw new InternalGemFireException(
+ "direct byte buffer attachment was not a byte buffer but a " +
+ attachment.getClass().getName());
+ }
+ } catch (Exception e) {
+ throw new InternalGemFireException("unable to retrieve underlying byte
buffer", e);
+ }
+ return result;
+ }
+
+ /**
* Update buffer stats.
*/
private void updateBufferStats(int size, boolean send, boolean direct) {
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/net/BufferPoolTest.java
b/geode-core/src/test/java/org/apache/geode/internal/net/BufferPoolTest.java
index 81382f0..7b2e86a 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/BufferPoolTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/BufferPoolTest.java
@@ -121,8 +121,10 @@ public class BufferPoolTest {
ByteBuffer newBuffer =
bufferPool.acquireDirectReceiveBuffer(10000);
- assertThat(buffer.capacity()).isGreaterThanOrEqualTo(4096);
- assertThat(newBuffer.capacity()).isGreaterThanOrEqualTo(32768);
+ assertThat(buffer.isDirect()).isTrue();
+ assertThat(newBuffer.isDirect()).isTrue();
+ assertThat(buffer.capacity()).isEqualTo(100);
+ assertThat(newBuffer.capacity()).isEqualTo(10000);
// buffer should be ready to read the same amount of data
assertThat(buffer.position()).isEqualTo(0);
@@ -137,23 +139,35 @@ public class BufferPoolTest {
ByteBuffer newBuffer =
bufferPool.acquireDirectReceiveBuffer(10000);
- assertThat(buffer.capacity()).isGreaterThanOrEqualTo(4096);
- assertThat(newBuffer.capacity()).isGreaterThanOrEqualTo(32768);
+ assertThat(buffer.capacity()).isEqualTo(100);
+ assertThat(newBuffer.capacity()).isEqualTo(10000);
+ assertThat(buffer.isDirect()).isTrue();
+ assertThat(newBuffer.isDirect()).isTrue();
+ assertThat(bufferPool.getPoolableBuffer(buffer).capacity())
+ .isGreaterThanOrEqualTo(BufferPool.SMALL_BUFFER_SIZE);
+ assertThat(bufferPool.getPoolableBuffer(newBuffer).capacity())
+ .isGreaterThanOrEqualTo(BufferPool.MEDIUM_BUFFER_SIZE);
assertThat(buffer.position()).isEqualTo(0);
assertThat(buffer.limit()).isEqualTo(100);
assertThat(newBuffer.position()).isEqualTo(0);
assertThat(newBuffer.limit()).isEqualTo(10000);
+
bufferPool.releaseReceiveBuffer(buffer);
bufferPool.releaseReceiveBuffer(newBuffer);
buffer = bufferPool.acquireDirectReceiveBuffer(1000);
-
newBuffer =
bufferPool.acquireDirectReceiveBuffer(15000);
- assertThat(buffer.capacity()).isGreaterThanOrEqualTo(4096);
- assertThat(newBuffer.capacity()).isGreaterThanOrEqualTo(32768);
+ assertThat(buffer.capacity()).isEqualTo(1000);
+ assertThat(newBuffer.capacity()).isEqualTo(15000);
+ assertThat(buffer.isDirect()).isTrue();
+ assertThat(newBuffer.isDirect()).isTrue();
+ assertThat(bufferPool.getPoolableBuffer(buffer).capacity())
+ .isGreaterThanOrEqualTo(BufferPool.SMALL_BUFFER_SIZE);
+ assertThat(bufferPool.getPoolableBuffer(newBuffer).capacity())
+ .isGreaterThanOrEqualTo(BufferPool.MEDIUM_BUFFER_SIZE);
assertThat(buffer.position()).isEqualTo(0);
assertThat(buffer.limit()).isEqualTo(1000);
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java
b/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java
index b4eae2f..3d394fb 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java
@@ -57,13 +57,12 @@ public class NioPlainEngineTest {
@Test
public void ensureWrappedCapacity() {
ByteBuffer wrappedBuffer = bufferPool.acquireDirectReceiveBuffer(100);
- verify(mockStats, times(1)).incReceiverBufferSize(any(Integer.class),
any(Boolean.class));
wrappedBuffer.put(new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
nioEngine.lastReadPosition = 10;
int requestedCapacity = 210;
ByteBuffer result = nioEngine.ensureWrappedCapacity(requestedCapacity,
wrappedBuffer,
BufferPool.BufferType.TRACKED_RECEIVER);
- verify(mockStats, times(1)).incReceiverBufferSize(any(Integer.class),
any(Boolean.class));
+ verify(mockStats, times(2)).incReceiverBufferSize(any(Integer.class),
any(Boolean.class));
assertThat(result.capacity()).isGreaterThanOrEqualTo(requestedCapacity);
assertThat(result).isGreaterThanOrEqualTo(wrappedBuffer);
// make sure that data was transferred to the new buffer
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/tcp/MsgStreamerTest.java
b/geode-core/src/test/java/org/apache/geode/internal/tcp/MsgStreamerTest.java
index b7df195..60d4578 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/tcp/MsgStreamerTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/tcp/MsgStreamerTest.java
@@ -88,7 +88,9 @@ public class MsgStreamerTest {
when(connection1.getRemoteAddress()).thenReturn(member1);
when(connection1.getRemoteVersion()).thenReturn(KnownVersion.CURRENT);
+
when(connection1.getSendBufferSize()).thenReturn(Connection.SMALL_BUFFER_SIZE);
when(connection2.getRemoteAddress()).thenReturn(member2);
+
when(connection2.getSendBufferSize()).thenReturn(Connection.SMALL_BUFFER_SIZE);
if (mixedDestinationVersions) {
when(connection1.getRemoteVersion()).thenReturn(KnownVersion.GEODE_1_12_0);
} else {