This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 35f8da66f9 Reduce memory allocations of calls to ByteBufer.duplicate()
made in org.apache.cassandra.transport.CBUtil#writeValue
35f8da66f9 is described below
commit 35f8da66f9b05d18b4177f8d2e1b86c772ad2221
Author: Natnael Adere <[email protected]>
AuthorDate: Thu Mar 2 11:50:07 2023 -0800
Reduce memory allocations of calls to ByteBufer.duplicate() made in
org.apache.cassandra.transport.CBUtil#writeValue
patch by Natnael Adere; reviewed by Benedict Elliott Smith, David Capwell
for CASSANDRA-18212
---
CHANGES.txt | 1 +
.../org/apache/cassandra/transport/CBUtil.java | 45 ++++++++++++++++-
.../apache/cassandra/transport/WriteBytesTest.java | 58 ++++++++++++++++++++++
.../org/apache/cassandra/utils/Generators.java | 34 ++++++++++++-
4 files changed, 134 insertions(+), 4 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 72bca4b3c7..4563765ebc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.2
+ * Reduce memory allocations of calls to ByteBufer.duplicate() made in
org.apache.cassandra.transport.CBUtil#writeValue (CASSANDRA-18212)
* CEP-17: SSTable API (CASSANDRA-17056)
* Gossip stateMapOrdering does not have correct ordering when both
EndpointState are in the bootstrapping set (CASSANDRA-18292)
* Snapshot only sstables containing mismatching ranges on preview repair
mismatch (CASSANDRA-17561)
diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java
b/src/java/org/apache/cassandra/transport/CBUtil.java
index 6cab63855e..bdb1a909ad 100644
--- a/src/java/org/apache/cassandra/transport/CBUtil.java
+++ b/src/java/org/apache/cassandra/transport/CBUtil.java
@@ -32,7 +32,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
@@ -46,6 +45,7 @@ import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.TimeUUID;
import org.apache.cassandra.utils.UUIDGen;
+import org.apache.cassandra.utils.memory.MemoryUtil;
/**
* ByteBuf utility methods.
@@ -69,6 +69,15 @@ public abstract class CBUtil
}
};
+ private final static FastThreadLocal<ByteBuffer> localDirectBuffer = new
FastThreadLocal<ByteBuffer>()
+ {
+ @Override
+ protected ByteBuffer initialValue()
+ {
+ return MemoryUtil.getHollowDirectByteBuffer();
+ }
+ };
+
private final static FastThreadLocal<CharBuffer> TL_CHAR_BUFFER = new
FastThreadLocal<>();
private CBUtil() {}
@@ -478,7 +487,35 @@ public abstract class CBUtil
cb.writeInt(remaining);
if (remaining > 0)
- cb.writeBytes(bytes.duplicate());
+ addBytes(bytes, cb);
+ }
+
+ public static void addBytes(ByteBuffer src, ByteBuf dest)
+ {
+ if (src.remaining() == 0)
+ return;
+
+ int length = src.remaining();
+
+ if (src.hasArray())
+ {
+ // Heap buffers are copied using a raw array instead of shared
heap buffer and MemoryUtil.unsafe to avoid a CMS bug, which causes the JVM to
crash with the follwing:
+ // # Problematic frame:
+ // # V [libjvm.dylib+0x63e858] void
ParScanClosure::do_oop_work<unsigned int>(unsigned int*, bool, bool)+0x94
+ // More details can be found here:
https://bugs.openjdk.org/browse/JDK-8222798
+ byte[] array = src.array();
+ dest.writeBytes(array, src.arrayOffset() + src.position(), length);
+ }
+ else if (src.isDirect())
+ {
+ ByteBuffer local = getLocalDirectBuffer();
+ MemoryUtil.duplicateDirectByteBuffer(src, local);
+ dest.writeBytes(local);
+ }
+ else
+ {
+ dest.writeBytes(src.duplicate());
+ }
}
public static int sizeOfValue(byte[] bytes)
@@ -614,4 +651,8 @@ public abstract class CBUtil
return bytes;
}
+ private static ByteBuffer getLocalDirectBuffer()
+ {
+ return localDirectBuffer.get();
+ }
}
diff --git a/test/unit/org/apache/cassandra/transport/WriteBytesTest.java
b/test/unit/org/apache/cassandra/transport/WriteBytesTest.java
new file mode 100644
index 0000000000..1dd1c796e8
--- /dev/null
+++ b/test/unit/org/apache/cassandra/transport/WriteBytesTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.transport;
+
+import org.junit.Test;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.Generators;
+import org.assertj.core.api.Assertions;
+
+import static org.quicktheories.QuickTheory.qt;
+
+
+public class WriteBytesTest
+{
+ @Test
+ public void test()
+ {
+ int maxBytes = 10_000;
+ ByteBuf buf = Unpooled.buffer(maxBytes);
+ qt().forAll(Generators.bytesAnyType(0, maxBytes)).checkAssert(bb -> {
+ buf.clear();
+
+ int size = bb.remaining();
+ int pos = bb.position();
+
+ CBUtil.addBytes(bb, buf);
+
+ // test for consumption
+ Assertions.assertThat(bb.remaining()).isEqualTo(size);
+ Assertions.assertThat(bb.position()).isEqualTo(pos);
+
+ Assertions.assertThat(buf.writerIndex()).isEqualTo(size);
+ for (int i = 0; i < size; i++)
+ Assertions.assertThat(buf.getByte(buf.readerIndex() +
i)).describedAs("byte mismatch at index %d", i).isEqualTo(bb.get(bb.position()
+ i));
+ FileUtils.clean(bb);
+ });
+ }
+
+}
diff --git a/test/unit/org/apache/cassandra/utils/Generators.java
b/test/unit/org/apache/cassandra/utils/Generators.java
index 179c0f4155..e25f9c0fe3 100644
--- a/test/unit/org/apache/cassandra/utils/Generators.java
+++ b/test/unit/org/apache/cassandra/utils/Generators.java
@@ -299,6 +299,16 @@ public final class Generators
}
public static Gen<ByteBuffer> bytes(int min, int max)
+ {
+ return bytes(min, max, SourceDSL.arbitrary().constant(BBCases.HEAP));
+ }
+
+ public static Gen<ByteBuffer> bytesAnyType(int min, int max)
+ {
+ return bytes(min, max,
SourceDSL.arbitrary().enumValues(BBCases.class));
+ }
+
+ private static Gen<ByteBuffer> bytes(int min, int max, Gen<BBCases> cases)
{
if (min < 0)
throw new IllegalArgumentException("Asked for negative bytes;
given " + min);
@@ -314,11 +324,31 @@ public final class Generators
// to add more randomness, also shift offset in the array so the
same size doesn't yield the same bytes
int offset = (int) rnd.next(Constraint.between(0, MAX_BLOB_LENGTH
- size));
- return ByteBuffer.wrap(LazySharedBlob.SHARED_BYTES, offset, size);
+ return handleCases(cases, rnd, offset, size);
};
+ };
+
+ private enum BBCases { HEAP, READ_ONLY_HEAP, DIRECT, READ_ONLY_DIRECT }
+
+ private static ByteBuffer handleCases(Gen<BBCases> cases, RandomnessSource
rnd, int offset, int size) {
+ switch (cases.generate(rnd))
+ {
+ case HEAP: return ByteBuffer.wrap(LazySharedBlob.SHARED_BYTES,
offset, size);
+ case READ_ONLY_HEAP: return
ByteBuffer.wrap(LazySharedBlob.SHARED_BYTES, offset, size).asReadOnlyBuffer();
+ case DIRECT: return directBufferFromSharedBlob(offset, size);
+ case READ_ONLY_DIRECT: return directBufferFromSharedBlob(offset,
size).asReadOnlyBuffer();
+ default: throw new AssertionError("can't wait for jdk 17!");
+ }
}
- /**
+ private static ByteBuffer directBufferFromSharedBlob(int offset, int size)
{
+ ByteBuffer bb = ByteBuffer.allocateDirect(size);
+ bb.put(LazySharedBlob.SHARED_BYTES, offset, size);
+ bb.flip();
+ return bb;
+ }
+
+ /**
* Implements a valid utf-8 generator.
*
* Implementation note, currently relies on getBytes to strip out
non-valid utf-8 chars, so is slow
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]