Repository: cassandra Updated Branches: refs/heads/trunk a5f70ad4b -> 6bea7de50
Reduce memory copies and object creations when acting on ByteBufs patch by Norman Maurer; reviewed by jasobrown for CASSANDRA-13789 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6bea7de5 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6bea7de5 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6bea7de5 Branch: refs/heads/trunk Commit: 6bea7de50418a7a7b9e2b8dcf2446701d9cc5b69 Parents: a5f70ad Author: Norman Maurer <[email protected]> Authored: Tue Aug 22 14:15:16 2017 +0200 Committer: Jason Brown <[email protected]> Committed: Thu Aug 24 14:57:37 2017 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/cql3/QueryOptions.java | 2 +- .../org/apache/cassandra/cql3/ResultSet.java | 2 +- .../org/apache/cassandra/transport/CBUtil.java | 56 +++++++++++++------- 4 files changed, 41 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bea7de5/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6b3ea2b..d1ef3c9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Reduce memory copies and object creations when acting on ByteBufs (CASSANDRA-13789) * simplify mx4j configuration (Cassandra-13578) * Fix trigger example on 4.0 (CASSANDRA-13796) * force minumum timeout value (CASSANDRA-9375) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bea7de5/src/java/org/apache/cassandra/cql3/QueryOptions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/QueryOptions.java b/src/java/org/apache/cassandra/cql3/QueryOptions.java index ac80729..e4d443d 100644 --- a/src/java/org/apache/cassandra/cql3/QueryOptions.java +++ b/src/java/org/apache/cassandra/cql3/QueryOptions.java @@ -429,7 +429,7 @@ public abstract class QueryOptions if (!flags.isEmpty()) { int pageSize = flags.contains(Flag.PAGE_SIZE) ? body.readInt() : -1; - PagingState pagingState = flags.contains(Flag.PAGING_STATE) ? PagingState.deserialize(CBUtil.readValue(body), version) : null; + PagingState pagingState = flags.contains(Flag.PAGING_STATE) ? PagingState.deserialize(CBUtil.readValueNoCopy(body), version) : null; ConsistencyLevel serialConsistency = flags.contains(Flag.SERIAL_CONSISTENCY) ? CBUtil.readConsistencyLevel(body) : ConsistencyLevel.SERIAL; long timestamp = Long.MIN_VALUE; if (flags.contains(Flag.TIMESTAMP)) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bea7de5/src/java/org/apache/cassandra/cql3/ResultSet.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/ResultSet.java b/src/java/org/apache/cassandra/cql3/ResultSet.java index 5ed4eee..2bb9997 100644 --- a/src/java/org/apache/cassandra/cql3/ResultSet.java +++ b/src/java/org/apache/cassandra/cql3/ResultSet.java @@ -310,7 +310,7 @@ public class ResultSet PagingState state = null; if (flags.contains(Flag.HAS_MORE_PAGES)) - state = PagingState.deserialize(CBUtil.readValue(body), version); + state = PagingState.deserialize(CBUtil.readValueNoCopy(body), version); if (flags.contains(Flag.NO_METADATA)) return new ResultMetadata(flags, null, columnCount, state); http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bea7de5/src/java/org/apache/cassandra/transport/CBUtil.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java b/src/java/org/apache/cassandra/transport/CBUtil.java index 824ad26..d5d733c 100644 --- a/src/java/org/apache/cassandra/transport/CBUtil.java +++ b/src/java/org/apache/cassandra/transport/CBUtil.java @@ -58,6 +58,7 @@ public abstract class CBUtil { public static final boolean USE_HEAP_ALLOCATOR = Boolean.getBoolean(Config.PROPERTY_PREFIX + "netty_use_heap_allocator"); public static final ByteBufAllocator allocator = USE_HEAP_ALLOCATOR ? new UnpooledByteBufAllocator(false) : new PooledByteBufAllocator(true); + private static final int UUID_SIZE = 16; private final static FastThreadLocal<CharsetDecoder> TL_UTF8_DECODER = new FastThreadLocal<CharsetDecoder>() { @@ -139,9 +140,9 @@ public abstract class CBUtil public static void writeString(String str, ByteBuf cb) { int writerIndex = cb.writerIndex(); - cb.writeShort(0); - int lengthBytes = ByteBufUtil.writeUtf8(cb, str); - cb.setShort(writerIndex, lengthBytes); + cb.writerIndex(writerIndex + 2); + int written = ByteBufUtil.writeUtf8(cb, str); + cb.setShort(writerIndex, written); } public static int sizeOfString(String str) @@ -164,14 +165,15 @@ public abstract class CBUtil public static void writeLongString(String str, ByteBuf cb) { - byte[] bytes = str.getBytes(CharsetUtil.UTF_8); - cb.writeInt(bytes.length); - cb.writeBytes(bytes); + int writerIndex = cb.writerIndex(); + cb.writerIndex(writerIndex + 4); + int written = ByteBufUtil.writeUtf8(cb, str); + cb.setInt(writerIndex, written); } public static int sizeOfLongString(String str) { - return 4 + str.getBytes(CharsetUtil.UTF_8).length; + return 4 + TypeSizes.encodedUTF8Length(str); } public static byte[] readBytes(ByteBuf cb) @@ -274,9 +276,9 @@ public abstract class CBUtil public static UUID readUUID(ByteBuf cb) { - byte[] bytes = new byte[16]; - cb.readBytes(bytes); - return UUIDGen.getUUID(ByteBuffer.wrap(bytes)); + ByteBuffer buffer = cb.nioBuffer(cb.readerIndex(), UUID_SIZE); + cb.skipBytes(buffer.remaining()); + return UUIDGen.getUUID(buffer); } public static void writeUUID(UUID uuid, ByteBuf cb) @@ -286,7 +288,7 @@ public abstract class CBUtil public static int sizeOfUUID(UUID uuid) { - return 16; + return UUID_SIZE; } public static List<String> readStringList(ByteBuf cb) @@ -386,9 +388,19 @@ public abstract class CBUtil int length = cb.readInt(); if (length < 0) return null; - ByteBuf slice = cb.readSlice(length); - return ByteBuffer.wrap(readRawBytes(slice)); + return ByteBuffer.wrap(readRawBytes(cb, length)); + } + + public static ByteBuffer readValueNoCopy(ByteBuf cb) + { + int length = cb.readInt(); + if (length < 0) + return null; + + ByteBuffer buffer = cb.nioBuffer(cb.readerIndex(), length); + cb.skipBytes(length); + return buffer; } public static ByteBuffer readBoundValue(ByteBuf cb, ProtocolVersion protocolVersion) @@ -405,9 +417,7 @@ public abstract class CBUtil else throw new ProtocolException("Invalid ByteBuf length " + length); } - ByteBuf slice = cb.readSlice(length); - - return ByteBuffer.wrap(readRawBytes(slice)); + return ByteBuffer.wrap(readRawBytes(cb, length)); } public static void writeValue(byte[] bytes, ByteBuf cb) @@ -434,7 +444,12 @@ public abstract class CBUtil cb.writeInt(remaining); if (remaining > 0) - cb.writeBytes(bytes.duplicate()); + { + // write and reset original position so we not need to create a duplicate. + int position = bytes.position(); + cb.writeBytes(bytes); + bytes.position(position); + } } public static int sizeOfValue(byte[] bytes) @@ -560,7 +575,12 @@ public abstract class CBUtil */ public static byte[] readRawBytes(ByteBuf cb) { - byte[] bytes = new byte[cb.readableBytes()]; + return readRawBytes(cb, cb.readableBytes()); + } + + private static byte[] readRawBytes(ByteBuf cb, int length) + { + byte[] bytes = new byte[length]; cb.readBytes(bytes); return bytes; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
