This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.12 by this push:
     new caf1c47  GEODE-8506: BufferPool returns byte buffers that may be much 
larger t… (#5525)
caf1c47 is described below

commit caf1c4783478c2b5c5e5c239874092346316f4fe
Author: Bruce Schuchardt <[email protected]>
AuthorDate: Fri Sep 18 14:21:55 2020 -0700

    GEODE-8506: BufferPool returns byte buffers that may be much larger t… 
(#5525)
    
    * GEODE-8506: BufferPool returns byte buffers that may be much larger than 
requested
    
    Create a "slice" of the acquired buffer to return to the caller instead
    of returning a buffer larger than what was requested.  On return we fish
    out the parent buffer and put it back in the pool.
    
    * cache reflection method, remove magic numbers in test and BufferPool
    
    (cherry picked from commit a11b9c076a72609ff00802c010b6e32262228776)
---
 .../distributed/internal/DistributionConfig.java   |  2 +-
 .../org/apache/geode/internal/net/BufferPool.java  | 75 ++++++++++++++++++----
 .../apache/geode/internal/net/BufferPoolTest.java  | 53 +++++++++++++++
 .../geode/internal/net/NioPlainEngineTest.java     |  1 -
 4 files changed, 115 insertions(+), 16 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionConfig.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionConfig.java
index 89a19a6..0074881 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionConfig.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionConfig.java
@@ -1386,7 +1386,7 @@ public interface DistributionConfig extends Config, 
LogConfig, StatisticsConfig
   /**
    * The default value of the {@link 
ConfigurationProperties#SOCKET_BUFFER_SIZE} property
    */
-  int DEFAULT_SOCKET_BUFFER_SIZE = 32768;
+  static int DEFAULT_SOCKET_BUFFER_SIZE = 32768;
   /**
    * The minimum {@link ConfigurationProperties#SOCKET_BUFFER_SIZE}.
    * <p>
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java 
b/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
index 0997c6e..e119250 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
@@ -15,16 +15,21 @@
 package org.apache.geode.internal.net;
 
 import java.lang.ref.SoftReference;
+import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
 import java.util.IdentityHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
+import org.apache.geode.InternalGemFireException;
+import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.internal.Assert;
 
 public class BufferPool {
   private final DMStats stats;
 
+  private Method parentOfSliceMethod;
+
   /**
    * Buffers may be acquired from the Buffers pool
    * or they may be allocated using Buffer.allocate(). This enum is used
@@ -85,21 +90,27 @@ public class BufferPool {
               stats.incReceiverBufferSize(-refSize, true);
             }
           }
-        } else if (bb.capacity() >= size) {
-          bb.rewind();
-          bb.limit(size);
-          return bb;
         } else {
-          // wasn't big enough so put it back in the queue
-          Assert.assertTrue(bufferQueue.offer(ref));
-          if (alreadySeen == null) {
-            alreadySeen = new IdentityHashMap<>();
-          }
-          if (alreadySeen.put(ref, ref) != null) {
-            // if it returns non-null then we have already seen this item
-            // so we have worked all the way through the queue once.
-            // So it is time to give up and allocate a new buffer.
-            break;
+          int capacity = bb.capacity();
+          if (capacity > size) {
+            bb.position(0).limit(size);
+            return bb.slice();
+          } else if (capacity == size) {
+            bb.rewind();
+            bb.limit(size);
+            return bb;
+          } else {
+            // wasn't big enough so put it back in the queue
+            Assert.assertTrue(bufferQueue.offer(ref));
+            if (alreadySeen == null) {
+              alreadySeen = new IdentityHashMap<>();
+            }
+            if (alreadySeen.put(ref, ref) != null) {
+              // if it returns non-null then we have already seen this item
+              // so we have worked all the way through the queue once.
+              // So it is time to give up and allocate a new buffer.
+              break;
+            }
           }
         }
         ref = bufferQueue.poll();
@@ -227,6 +238,7 @@ public class BufferPool {
    */
   private void releaseBuffer(ByteBuffer bb, boolean send) {
     if (bb.isDirect()) {
+      bb = getPoolableBuffer(bb);
       BBSoftReference bbRef = new BBSoftReference(bb, send);
       bufferQueue.offer(bbRef);
     } else {
@@ -239,6 +251,41 @@ public class BufferPool {
   }
 
   /**
+   * If we hand out a buffer that is larger than the requested size we create a
+   * "slice" of the buffer having the requested capacity and hand that out 
instead.
+   * When we put the buffer back in the pool we need to find the original, 
non-sliced,
+   * buffer. This is held in DirectBuffer in its "attachment" field, which is 
a public
+   * method, though DirectBuffer is package-private.
+   */
+  @VisibleForTesting
+  public ByteBuffer getPoolableBuffer(ByteBuffer buffer) {
+    ByteBuffer result = buffer;
+    if (parentOfSliceMethod == null) {
+      Class clazz = buffer.getClass();
+      try {
+        Method method = clazz.getMethod("attachment");
+        method.setAccessible(true);
+        parentOfSliceMethod = method;
+      } catch (Exception e) {
+        throw new InternalGemFireException("unable to retrieve underlying byte 
buffer", e);
+      }
+    }
+    try {
+      Object attachment = parentOfSliceMethod.invoke(buffer);
+      if (attachment instanceof ByteBuffer) {
+        result = (ByteBuffer) attachment;
+      } else if (attachment != null) {
+        throw new InternalGemFireException(
+            "direct byte buffer attachment was not a byte buffer but a " +
+                attachment.getClass().getName());
+      }
+    } catch (Exception e) {
+      throw new InternalGemFireException("unable to retrieve underlying byte 
buffer", e);
+    }
+    return result;
+  }
+
+  /**
    * A soft reference that remembers the size of the byte buffer it refers to. 
TODO Dan - I really
    * think this should be a weak reference. The JVM doesn't seem to clear soft 
references if it is
    * getting low on direct memory.
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/net/BufferPoolTest.java 
b/geode-core/src/test/java/org/apache/geode/internal/net/BufferPoolTest.java
index cc441e4..dec5553 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/BufferPoolTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/BufferPoolTest.java
@@ -113,4 +113,57 @@ public class BufferPoolTest {
     assertThat(newBuffer.position()).isEqualTo(16384);
     assertThat(newBuffer.limit()).isEqualTo(newBuffer.capacity());
   }
+
+  @Test
+  public void checkBufferSizeAfterAllocation() throws Exception {
+    ByteBuffer buffer = bufferPool.acquireDirectReceiveBuffer(100);
+
+    ByteBuffer newBuffer =
+        bufferPool.acquireDirectReceiveBuffer(10000);
+    assertThat(buffer.isDirect()).isTrue();
+    assertThat(newBuffer.isDirect()).isTrue();
+    assertThat(buffer.capacity()).isEqualTo(100);
+    assertThat(newBuffer.capacity()).isEqualTo(10000);
+
+    // buffer should be ready to read the same amount of data
+    assertThat(buffer.position()).isEqualTo(0);
+    assertThat(buffer.limit()).isEqualTo(100);
+    assertThat(newBuffer.position()).isEqualTo(0);
+    assertThat(newBuffer.limit()).isEqualTo(10000);
+  }
+
+  @Test
+  public void checkBufferSizeAfterAcquire() throws Exception {
+    ByteBuffer buffer = bufferPool.acquireDirectReceiveBuffer(100);
+
+    ByteBuffer newBuffer =
+        bufferPool.acquireDirectReceiveBuffer(10000);
+    assertThat(buffer.capacity()).isEqualTo(100);
+    assertThat(newBuffer.capacity()).isEqualTo(10000);
+    assertThat(buffer.isDirect()).isTrue();
+    assertThat(newBuffer.isDirect()).isTrue();
+
+    assertThat(buffer.position()).isEqualTo(0);
+    assertThat(buffer.limit()).isEqualTo(100);
+    assertThat(newBuffer.position()).isEqualTo(0);
+    assertThat(newBuffer.limit()).isEqualTo(10000);
+
+    bufferPool.releaseReceiveBuffer(buffer);
+    bufferPool.releaseReceiveBuffer(newBuffer);
+
+    buffer = bufferPool.acquireDirectReceiveBuffer(1000);
+    newBuffer =
+        bufferPool.acquireDirectReceiveBuffer(15000);
+
+    assertThat(buffer.capacity()).isEqualTo(1000);
+    assertThat(newBuffer.capacity()).isEqualTo(15000);
+    assertThat(buffer.isDirect()).isTrue();
+    assertThat(newBuffer.isDirect()).isTrue();
+
+    assertThat(buffer.position()).isEqualTo(0);
+    assertThat(buffer.limit()).isEqualTo(1000);
+    assertThat(newBuffer.position()).isEqualTo(0);
+    assertThat(newBuffer.limit()).isEqualTo(15000);
+  }
+
 }
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java
index e9785de..166c2a5 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java
@@ -57,7 +57,6 @@ public class NioPlainEngineTest {
   @Test
   public void ensureWrappedCapacity() {
     ByteBuffer wrappedBuffer = bufferPool.acquireDirectReceiveBuffer(100);
-    verify(mockStats, times(1)).incReceiverBufferSize(any(Integer.class), 
any(Boolean.class));
     wrappedBuffer.put(new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
     nioEngine.lastReadPosition = 10;
     int requestedCapacity = 210;

Reply via email to