This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch support/1.13
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.13 by this push:
     new 709fe7b  GEODE-8506: BufferPool returns byte buffers that may be much 
larger t… (#5525)
709fe7b is described below

commit 709fe7b8c4c61eb610a73fcf71005fe93a4d6e1c
Author: Bruce Schuchardt <[email protected]>
AuthorDate: Fri Sep 18 14:21:55 2020 -0700

    GEODE-8506: BufferPool returns byte buffers that may be much larger t… 
(#5525)
    
    * GEODE-8506: BufferPool returns byte buffers that may be much larger than 
requested
    
    Create a "slice" of the acquired buffer to return to the caller instead
    of returning a buffer larger than what was requested.  On return we fish
    out the parent buffer and put it back in the pool.
    
    * cache reflection method, remove magic numbers in test and BufferPool
    
    (cherry picked from commit a11b9c076a72609ff00802c010b6e32262228776)
---
 .../distributed/internal/DistributionConfig.java   |  2 +-
 .../org/apache/geode/internal/net/BufferPool.java  | 74 ++++++++++++++++++----
 .../apache/geode/internal/net/BufferPoolTest.java  | 28 ++++++--
 .../geode/internal/net/NioPlainEngineTest.java     |  3 +-
 .../apache/geode/internal/tcp/MsgStreamerTest.java |  2 +
 5 files changed, 85 insertions(+), 24 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionConfig.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionConfig.java
index 3c66211..bcb73ed 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionConfig.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionConfig.java
@@ -1386,7 +1386,7 @@ public interface DistributionConfig extends Config, 
LogConfig, StatisticsConfig
   /**
    * The default value of the {@link 
ConfigurationProperties#SOCKET_BUFFER_SIZE} property
    */
-  int DEFAULT_SOCKET_BUFFER_SIZE = 32768;
+  static int DEFAULT_SOCKET_BUFFER_SIZE = 32768;
   /**
    * The minimum {@link ConfigurationProperties#SOCKET_BUFFER_SIZE}.
    * <p>
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java 
b/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
index 26d069b..20b138e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
@@ -15,12 +15,15 @@
 package org.apache.geode.internal.net;
 
 import java.lang.ref.SoftReference;
+import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
 import java.util.IdentityHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.logging.log4j.Logger;
 
+import org.apache.geode.InternalGemFireException;
+import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.internal.Assert;
@@ -32,6 +35,8 @@ public class BufferPool {
   private final DMStats stats;
   private static final Logger logger = LogService.getLogger();
 
+  private Method parentOfSliceMethod;
+
   /**
    * Buffers may be acquired from the Buffers pool
    * or they may be allocated using Buffer.allocate(). This enum is used
@@ -65,10 +70,10 @@ public class BufferPool {
   private final ConcurrentLinkedQueue<BBSoftReference> bufferLargeQueue =
       new ConcurrentLinkedQueue<>();
 
-  private final int SMALL_BUFFER_SIZE = Connection.SMALL_BUFFER_SIZE;
+  static final int SMALL_BUFFER_SIZE = Connection.SMALL_BUFFER_SIZE;
 
 
-  private final int MEDIUM_BUFFER_SIZE = 
DistributionConfig.DEFAULT_SOCKET_BUFFER_SIZE;
+  static final int MEDIUM_BUFFER_SIZE = 
DistributionConfig.DEFAULT_SOCKET_BUFFER_SIZE;
 
 
   /**
@@ -98,14 +103,18 @@ public class BufferPool {
 
     if (useDirectBuffers) {
       if (size <= MEDIUM_BUFFER_SIZE) {
-        return acquirePredefinedFixedBuffer(send, size);
+        result = acquirePredefinedFixedBuffer(send, size);
       } else {
-        return acquireLargeBuffer(send, size);
+        result = acquireLargeBuffer(send, size);
       }
-    } else {
-      // if we are using heap buffers then don't bother with keeping them 
around
-      result = ByteBuffer.allocate(size);
+      if (result.capacity() > size) {
+        result.position(0).limit(size);
+        result = result.slice();
+      }
+      return result;
     }
+    // if we are using heap buffers then don't bother with keeping them around
+    result = ByteBuffer.allocate(size);
     updateBufferStats(size, send, false);
     return result;
   }
@@ -123,7 +132,8 @@ public class BufferPool {
   }
 
   /**
-   * Acquire direct buffer with predefined default capacity (4096 or 32768)
+   * Acquire direct buffer with predefined default capacity (SMALL_BUFFER_SIZE 
or
+   * MEDIUM_BUFFER_SIZE)
    */
   private ByteBuffer acquirePredefinedFixedBuffer(boolean send, int size) {
     // set
@@ -295,22 +305,58 @@ public class BufferPool {
   /**
    * Releases a previously acquired buffer.
    */
-  private void releaseBuffer(ByteBuffer bb, boolean send) {
-    if (bb.isDirect()) {
-      BBSoftReference bbRef = new BBSoftReference(bb, send);
-      if (bb.capacity() <= SMALL_BUFFER_SIZE) {
+  private void releaseBuffer(ByteBuffer buffer, boolean send) {
+    if (buffer.isDirect()) {
+      buffer = getPoolableBuffer(buffer);
+      BBSoftReference bbRef = new BBSoftReference(buffer, send);
+      if (buffer.capacity() <= SMALL_BUFFER_SIZE) {
         bufferSmallQueue.offer(bbRef);
-      } else if (bb.capacity() <= MEDIUM_BUFFER_SIZE) {
+      } else if (buffer.capacity() <= MEDIUM_BUFFER_SIZE) {
         bufferMiddleQueue.offer(bbRef);
       } else {
         bufferLargeQueue.offer(bbRef);
       }
     } else {
-      updateBufferStats(-bb.capacity(), send, false);
+      updateBufferStats(-buffer.capacity(), send, false);
     }
   }
 
   /**
+   * If we hand out a buffer that is larger than the requested size we create a
+   * "slice" of the buffer having the requested capacity and hand that out 
instead.
+   * When we put the buffer back in the pool we need to find the original, 
non-sliced,
+   * buffer. This is held in DirectBuffer in its "attachment" field, which is 
a public
+   * method, though DirectBuffer is package-private.
+   */
+  @VisibleForTesting
+  public ByteBuffer getPoolableBuffer(ByteBuffer buffer) {
+    ByteBuffer result = buffer;
+    if (parentOfSliceMethod == null) {
+      Class clazz = buffer.getClass();
+      try {
+        Method method = clazz.getMethod("attachment");
+        method.setAccessible(true);
+        parentOfSliceMethod = method;
+      } catch (Exception e) {
+        throw new InternalGemFireException("unable to retrieve underlying byte 
buffer", e);
+      }
+    }
+    try {
+      Object attachment = parentOfSliceMethod.invoke(buffer);
+      if (attachment instanceof ByteBuffer) {
+        result = (ByteBuffer) attachment;
+      } else if (attachment != null) {
+        throw new InternalGemFireException(
+            "direct byte buffer attachment was not a byte buffer but a " +
+                attachment.getClass().getName());
+      }
+    } catch (Exception e) {
+      throw new InternalGemFireException("unable to retrieve underlying byte 
buffer", e);
+    }
+    return result;
+  }
+
+  /**
    * Update buffer stats.
    */
   private void updateBufferStats(int size, boolean send, boolean direct) {
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/net/BufferPoolTest.java 
b/geode-core/src/test/java/org/apache/geode/internal/net/BufferPoolTest.java
index 81382f0..7b2e86a 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/BufferPoolTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/BufferPoolTest.java
@@ -121,8 +121,10 @@ public class BufferPoolTest {
 
     ByteBuffer newBuffer =
         bufferPool.acquireDirectReceiveBuffer(10000);
-    assertThat(buffer.capacity()).isGreaterThanOrEqualTo(4096);
-    assertThat(newBuffer.capacity()).isGreaterThanOrEqualTo(32768);
+    assertThat(buffer.isDirect()).isTrue();
+    assertThat(newBuffer.isDirect()).isTrue();
+    assertThat(buffer.capacity()).isEqualTo(100);
+    assertThat(newBuffer.capacity()).isEqualTo(10000);
 
     // buffer should be ready to read the same amount of data
     assertThat(buffer.position()).isEqualTo(0);
@@ -137,23 +139,35 @@ public class BufferPoolTest {
 
     ByteBuffer newBuffer =
         bufferPool.acquireDirectReceiveBuffer(10000);
-    assertThat(buffer.capacity()).isGreaterThanOrEqualTo(4096);
-    assertThat(newBuffer.capacity()).isGreaterThanOrEqualTo(32768);
+    assertThat(buffer.capacity()).isEqualTo(100);
+    assertThat(newBuffer.capacity()).isEqualTo(10000);
+    assertThat(buffer.isDirect()).isTrue();
+    assertThat(newBuffer.isDirect()).isTrue();
+    assertThat(bufferPool.getPoolableBuffer(buffer).capacity())
+        .isGreaterThanOrEqualTo(BufferPool.SMALL_BUFFER_SIZE);
+    assertThat(bufferPool.getPoolableBuffer(newBuffer).capacity())
+        .isGreaterThanOrEqualTo(BufferPool.MEDIUM_BUFFER_SIZE);
 
     assertThat(buffer.position()).isEqualTo(0);
     assertThat(buffer.limit()).isEqualTo(100);
     assertThat(newBuffer.position()).isEqualTo(0);
     assertThat(newBuffer.limit()).isEqualTo(10000);
+
     bufferPool.releaseReceiveBuffer(buffer);
     bufferPool.releaseReceiveBuffer(newBuffer);
 
     buffer = bufferPool.acquireDirectReceiveBuffer(1000);
-
     newBuffer =
         bufferPool.acquireDirectReceiveBuffer(15000);
 
-    assertThat(buffer.capacity()).isGreaterThanOrEqualTo(4096);
-    assertThat(newBuffer.capacity()).isGreaterThanOrEqualTo(32768);
+    assertThat(buffer.capacity()).isEqualTo(1000);
+    assertThat(newBuffer.capacity()).isEqualTo(15000);
+    assertThat(buffer.isDirect()).isTrue();
+    assertThat(newBuffer.isDirect()).isTrue();
+    assertThat(bufferPool.getPoolableBuffer(buffer).capacity())
+        .isGreaterThanOrEqualTo(BufferPool.SMALL_BUFFER_SIZE);
+    assertThat(bufferPool.getPoolableBuffer(newBuffer).capacity())
+        .isGreaterThanOrEqualTo(BufferPool.MEDIUM_BUFFER_SIZE);
 
     assertThat(buffer.position()).isEqualTo(0);
     assertThat(buffer.limit()).isEqualTo(1000);
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java
index b4eae2f..3d394fb 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java
@@ -57,13 +57,12 @@ public class NioPlainEngineTest {
   @Test
   public void ensureWrappedCapacity() {
     ByteBuffer wrappedBuffer = bufferPool.acquireDirectReceiveBuffer(100);
-    verify(mockStats, times(1)).incReceiverBufferSize(any(Integer.class), 
any(Boolean.class));
     wrappedBuffer.put(new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
     nioEngine.lastReadPosition = 10;
     int requestedCapacity = 210;
     ByteBuffer result = nioEngine.ensureWrappedCapacity(requestedCapacity, 
wrappedBuffer,
         BufferPool.BufferType.TRACKED_RECEIVER);
-    verify(mockStats, times(1)).incReceiverBufferSize(any(Integer.class), 
any(Boolean.class));
+    verify(mockStats, times(2)).incReceiverBufferSize(any(Integer.class), 
any(Boolean.class));
     assertThat(result.capacity()).isGreaterThanOrEqualTo(requestedCapacity);
     assertThat(result).isGreaterThanOrEqualTo(wrappedBuffer);
     // make sure that data was transferred to the new buffer
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/tcp/MsgStreamerTest.java 
b/geode-core/src/test/java/org/apache/geode/internal/tcp/MsgStreamerTest.java
index 8094a37..1631b08 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/tcp/MsgStreamerTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/tcp/MsgStreamerTest.java
@@ -88,7 +88,9 @@ public class MsgStreamerTest {
 
     when(connection1.getRemoteAddress()).thenReturn(member1);
     when(connection1.getRemoteVersion()).thenReturn(Version.CURRENT);
+    
when(connection1.getSendBufferSize()).thenReturn(Connection.SMALL_BUFFER_SIZE);
     when(connection2.getRemoteAddress()).thenReturn(member2);
+    
when(connection2.getSendBufferSize()).thenReturn(Connection.SMALL_BUFFER_SIZE);
     if (mixedDestinationVersions) {
       when(connection1.getRemoteVersion()).thenReturn(Version.GEODE_1_12_0);
     } else {

Reply via email to