This is an automated email from the ASF dual-hosted git repository. dragonyliu pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 5a03548bbd6c172c83930d5db4352fa435d6826d Author: William Song <[email protected]> AuthorDate: Thu Oct 13 18:25:53 2022 +0800 RATIS-1717. Perf: Use global serialize buf to avoid temp buf (#762) (cherry picked from commit 2753f3d6207a88f5a5f193a28371286db4e8b398) --- .../raftlog/segmented/BufferedWriteChannel.java | 7 +++++-- .../segmented/SegmentedRaftLogOutputStream.java | 21 ++++++++++++++++----- .../raftlog/segmented/SegmentedRaftLogWorker.java | 6 +++++- 3 files changed, 26 insertions(+), 8 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java index f6c10ea41..ef9987ff7 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java @@ -63,9 +63,12 @@ class BufferedWriteChannel implements Closeable { } void write(byte[] b) throws IOException { + write(b, b.length); + } + void write(byte[] b, int len) throws IOException { int offset = 0; - while (offset < b.length) { - int toPut = Math.min(b.length - offset, writeBuffer.remaining()); + while (offset < len) { + int toPut = Math.min(len - offset, writeBuffer.remaining()); writeBuffer.put(b, offset, toPut); offset += toPut; if (writeBuffer.remaining() == 0) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java index 22eebac93..e0fd41fbd 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java @@ -34,6 +34,7 @@ import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; +import java.util.function.Supplier; import java.util.zip.Checksum; public class SegmentedRaftLogOutputStream implements Closeable { @@ -52,6 +53,7 @@ public class SegmentedRaftLogOutputStream implements Closeable { private final File file; private final BufferedWriteChannel out; // buffered FileChannel for writing private final Checksum checksum; + private final Supplier<byte[]> sharedBuffer; private final long segmentMaxSize; private final long preallocatedSize; @@ -59,10 +61,17 @@ public class SegmentedRaftLogOutputStream implements Closeable { public SegmentedRaftLogOutputStream(File file, boolean append, long segmentMaxSize, long preallocatedSize, ByteBuffer byteBuffer) throws IOException { + this(file, append, segmentMaxSize, preallocatedSize, byteBuffer, null); + } + + SegmentedRaftLogOutputStream(File file, boolean append, long segmentMaxSize, + long preallocatedSize, ByteBuffer byteBuffer, Supplier<byte[]> sharedBuffer) + throws IOException { this.file = file; this.checksum = new PureJavaCrc32C(); this.segmentMaxSize = segmentMaxSize; this.preallocatedSize = preallocatedSize; + this.sharedBuffer = sharedBuffer; this.out = BufferedWriteChannel.open(file, append, byteBuffer); if (!append) { @@ -75,12 +84,12 @@ public class SegmentedRaftLogOutputStream implements Closeable { /** * Write the given entry to this output stream. - * + * <p> * Format: * (1) The serialized size of the entry. * (2) The entry. * (3) 4-byte checksum of the entry. - * + * <p> * Size in bytes to be written: * (size to encode n) + n + (checksum size), * where n is the entry serialized size and the checksum size is 4. @@ -88,8 +97,10 @@ public class SegmentedRaftLogOutputStream implements Closeable { public void write(LogEntryProto entry) throws IOException { final int serialized = entry.getSerializedSize(); final int proto = CodedOutputStream.computeUInt32SizeNoTag(serialized) + serialized; - final byte[] buf = new byte[proto + 4]; // proto and 4-byte checksum - preallocateIfNecessary(buf.length); + final int total = proto + 4; // proto and 4-byte checksum + final byte[] buf = sharedBuffer != null? sharedBuffer.get(): new byte[total]; + Preconditions.assertTrue(total <= buf.length, () -> "total = " + total + " > buf.length " + buf.length); + preallocateIfNecessary(total); CodedOutputStream cout = CodedOutputStream.newInstance(buf); cout.writeUInt32NoTag(serialized); @@ -99,7 +110,7 @@ public class SegmentedRaftLogOutputStream implements Closeable { checksum.update(buf, 0, proto); ByteBuffer.wrap(buf, proto, 4).putInt((int) checksum.getValue()); - out.write(buf); + out.write(buf, total); } @Override diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java index 516b01c7b..7bc673a79 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java @@ -154,6 +154,7 @@ class SegmentedRaftLogWorker { private final Timer raftLogEnqueueingDelayTimer; private final SegmentedRaftLogMetrics raftLogMetrics; private final ByteBuffer writeBuffer; + private final Supplier<byte[]> sharedBuffer; /** * The number of entries that have been written into the SegmentedRaftLogOutputStream but @@ -217,6 +218,9 @@ class SegmentedRaftLogWorker { final int bufferSize = RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt(); this.writeBuffer = ByteBuffer.allocateDirect(bufferSize); + final int logEntryLimit = RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties).getSizeInt(); + // 4 bytes (serialized size) + logEntryLimit + 4 bytes (checksum) + this.sharedBuffer = MemoizedSupplier.valueOf(() -> new byte[logEntryLimit + 8]); this.unsafeFlush = RaftServerConfigKeys.Log.unsafeFlushEnabled(properties); this.asyncFlush = RaftServerConfigKeys.Log.asyncFlushEnabled(properties); if (asyncFlush && unsafeFlush) { @@ -750,6 +754,6 @@ class SegmentedRaftLogWorker { private void allocateSegmentedRaftLogOutputStream(File file, boolean append) throws IOException { Preconditions.assertTrue(out == null && writeBuffer.position() == 0); out = new SegmentedRaftLogOutputStream(file, append, segmentMaxSize, - preallocatedSize, writeBuffer); + preallocatedSize, writeBuffer, sharedBuffer); } }
