This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 35f8da66f9 Reduce memory allocations of calls to ByteBufer.duplicate() 
made in org.apache.cassandra.transport.CBUtil#writeValue
35f8da66f9 is described below

commit 35f8da66f9b05d18b4177f8d2e1b86c772ad2221
Author: Natnael Adere <[email protected]>
AuthorDate: Thu Mar 2 11:50:07 2023 -0800

    Reduce memory allocations of calls to ByteBufer.duplicate() made in 
org.apache.cassandra.transport.CBUtil#writeValue
    
    patch by Natnael Adere; reviewed by Benedict Elliott Smith, David Capwell 
for CASSANDRA-18212
---
 CHANGES.txt                                        |  1 +
 .../org/apache/cassandra/transport/CBUtil.java     | 45 ++++++++++++++++-
 .../apache/cassandra/transport/WriteBytesTest.java | 58 ++++++++++++++++++++++
 .../org/apache/cassandra/utils/Generators.java     | 34 ++++++++++++-
 4 files changed, 134 insertions(+), 4 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 72bca4b3c7..4563765ebc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.2
+ * Reduce memory allocations of calls to ByteBufer.duplicate() made in 
org.apache.cassandra.transport.CBUtil#writeValue (CASSANDRA-18212)
  * CEP-17: SSTable API (CASSANDRA-17056)
  * Gossip stateMapOrdering does not have correct ordering when both 
EndpointState are in the bootstrapping set (CASSANDRA-18292)
  * Snapshot only sstables containing mismatching ranges on preview repair 
mismatch (CASSANDRA-17561)
diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java 
b/src/java/org/apache/cassandra/transport/CBUtil.java
index 6cab63855e..bdb1a909ad 100644
--- a/src/java/org/apache/cassandra/transport/CBUtil.java
+++ b/src/java/org/apache/cassandra/transport/CBUtil.java
@@ -32,7 +32,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.ByteBufUtil;
@@ -46,6 +45,7 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.TimeUUID;
 import org.apache.cassandra.utils.UUIDGen;
+import org.apache.cassandra.utils.memory.MemoryUtil;
 
 /**
  * ByteBuf utility methods.
@@ -69,6 +69,15 @@ public abstract class CBUtil
         }
     };
 
+    private final static FastThreadLocal<ByteBuffer> localDirectBuffer = new 
FastThreadLocal<ByteBuffer>()
+    {
+        @Override
+        protected ByteBuffer initialValue()
+        {
+            return MemoryUtil.getHollowDirectByteBuffer();
+        }
+    };
+
     private final static FastThreadLocal<CharBuffer> TL_CHAR_BUFFER = new 
FastThreadLocal<>();
 
     private CBUtil() {}
@@ -478,7 +487,35 @@ public abstract class CBUtil
         cb.writeInt(remaining);
 
         if (remaining > 0)
-            cb.writeBytes(bytes.duplicate());
+            addBytes(bytes, cb);
+    }
+
+    public static void addBytes(ByteBuffer src, ByteBuf dest)
+    {
+        if (src.remaining() == 0)
+            return;
+
+        int length = src.remaining();
+
+        if (src.hasArray())
+        {
+            // Heap buffers are copied using a raw array instead of shared 
heap buffer and MemoryUtil.unsafe to avoid a CMS bug, which causes the JVM to 
crash with the follwing:
+            // # Problematic frame:
+            // # V  [libjvm.dylib+0x63e858]  void 
ParScanClosure::do_oop_work<unsigned int>(unsigned int*, bool, bool)+0x94
+            // More details can be found here: 
https://bugs.openjdk.org/browse/JDK-8222798
+            byte[] array = src.array();
+            dest.writeBytes(array, src.arrayOffset() + src.position(), length);
+        }
+        else if (src.isDirect())
+        {
+            ByteBuffer local = getLocalDirectBuffer();
+            MemoryUtil.duplicateDirectByteBuffer(src, local);
+            dest.writeBytes(local);
+        }
+        else
+        {
+            dest.writeBytes(src.duplicate());
+        }
     }
 
     public static int sizeOfValue(byte[] bytes)
@@ -614,4 +651,8 @@ public abstract class CBUtil
         return bytes;
     }
 
+    private static ByteBuffer getLocalDirectBuffer()
+    {
+        return localDirectBuffer.get();
+    }
 }
diff --git a/test/unit/org/apache/cassandra/transport/WriteBytesTest.java 
b/test/unit/org/apache/cassandra/transport/WriteBytesTest.java
new file mode 100644
index 0000000000..1dd1c796e8
--- /dev/null
+++ b/test/unit/org/apache/cassandra/transport/WriteBytesTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.transport;
+
+import org.junit.Test;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.Generators;
+import org.assertj.core.api.Assertions;
+
+import static org.quicktheories.QuickTheory.qt;
+
+
+public class WriteBytesTest
+{
+    @Test
+    public void test()
+    {
+        int maxBytes = 10_000;
+        ByteBuf buf = Unpooled.buffer(maxBytes);
+        qt().forAll(Generators.bytesAnyType(0, maxBytes)).checkAssert(bb -> {
+            buf.clear();
+
+            int size = bb.remaining();
+            int pos = bb.position();
+
+            CBUtil.addBytes(bb, buf);
+
+            // test for consumption
+            Assertions.assertThat(bb.remaining()).isEqualTo(size);
+            Assertions.assertThat(bb.position()).isEqualTo(pos);
+
+            Assertions.assertThat(buf.writerIndex()).isEqualTo(size);
+            for (int i = 0; i < size; i++)
+                Assertions.assertThat(buf.getByte(buf.readerIndex() + 
i)).describedAs("byte mismatch at index %d", i).isEqualTo(bb.get(bb.position() 
+ i));
+            FileUtils.clean(bb);
+        });
+    }
+
+}
diff --git a/test/unit/org/apache/cassandra/utils/Generators.java 
b/test/unit/org/apache/cassandra/utils/Generators.java
index 179c0f4155..e25f9c0fe3 100644
--- a/test/unit/org/apache/cassandra/utils/Generators.java
+++ b/test/unit/org/apache/cassandra/utils/Generators.java
@@ -299,6 +299,16 @@ public final class Generators
     }
 
     public static Gen<ByteBuffer> bytes(int min, int max)
+    {
+        return bytes(min, max, SourceDSL.arbitrary().constant(BBCases.HEAP));
+    }
+
+    public static Gen<ByteBuffer> bytesAnyType(int min, int max)
+    {
+        return bytes(min, max, 
SourceDSL.arbitrary().enumValues(BBCases.class));
+    }
+
+    private static Gen<ByteBuffer> bytes(int min, int max, Gen<BBCases> cases)
     {
         if (min < 0)
             throw new IllegalArgumentException("Asked for negative bytes; 
given " + min);
@@ -314,11 +324,31 @@ public final class Generators
             // to add more randomness, also shift offset in the array so the 
same size doesn't yield the same bytes
             int offset = (int) rnd.next(Constraint.between(0, MAX_BLOB_LENGTH 
- size));
 
-            return ByteBuffer.wrap(LazySharedBlob.SHARED_BYTES, offset, size);
+            return handleCases(cases, rnd, offset, size);
         };
+    };
+
+    private enum BBCases { HEAP, READ_ONLY_HEAP, DIRECT, READ_ONLY_DIRECT }
+
+    private static ByteBuffer handleCases(Gen<BBCases> cases, RandomnessSource 
rnd, int offset, int size) {
+        switch (cases.generate(rnd))
+        {
+            case HEAP: return ByteBuffer.wrap(LazySharedBlob.SHARED_BYTES, 
offset, size);
+            case READ_ONLY_HEAP: return 
ByteBuffer.wrap(LazySharedBlob.SHARED_BYTES, offset, size).asReadOnlyBuffer();
+            case DIRECT: return directBufferFromSharedBlob(offset, size);
+            case READ_ONLY_DIRECT: return directBufferFromSharedBlob(offset, 
size).asReadOnlyBuffer();
+            default: throw new AssertionError("can't wait for jdk 17!");
+        }
     }
 
-    /**
+    private static ByteBuffer directBufferFromSharedBlob(int offset, int size) 
{
+        ByteBuffer bb = ByteBuffer.allocateDirect(size);
+        bb.put(LazySharedBlob.SHARED_BYTES, offset, size);
+        bb.flip();
+        return bb;
+    }
+
+     /**
      * Implements a valid utf-8 generator.
      *
      * Implementation note, currently relies on getBytes to strip out 
non-valid utf-8 chars, so is slow


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to