This is an automated email from the ASF dual-hosted git repository.

dragonyliu pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 5a03548bbd6c172c83930d5db4352fa435d6826d
Author: William Song <[email protected]>
AuthorDate: Thu Oct 13 18:25:53 2022 +0800

    RATIS-1717. Perf: Use global serialize buf to avoid temp buf (#762)
    
    (cherry picked from commit 2753f3d6207a88f5a5f193a28371286db4e8b398)
---
 .../raftlog/segmented/BufferedWriteChannel.java     |  7 +++++--
 .../segmented/SegmentedRaftLogOutputStream.java     | 21 ++++++++++++++++-----
 .../raftlog/segmented/SegmentedRaftLogWorker.java   |  6 +++++-
 3 files changed, 26 insertions(+), 8 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
index f6c10ea41..ef9987ff7 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
@@ -63,9 +63,12 @@ class BufferedWriteChannel implements Closeable {
   }
 
   void write(byte[] b) throws IOException {
+    write(b, b.length);
+  }
+  void write(byte[] b, int len) throws IOException {
     int offset = 0;
-    while (offset < b.length) {
-      int toPut = Math.min(b.length - offset, writeBuffer.remaining());
+    while (offset < len) {
+      int toPut = Math.min(len - offset, writeBuffer.remaining());
       writeBuffer.put(b, offset, toPut);
       offset += toPut;
       if (writeBuffer.remaining() == 0) {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
index 22eebac93..e0fd41fbd 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
@@ -34,6 +34,7 @@ import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
 import java.util.zip.Checksum;
 
 public class SegmentedRaftLogOutputStream implements Closeable {
@@ -52,6 +53,7 @@ public class SegmentedRaftLogOutputStream implements 
Closeable {
   private final File file;
   private final BufferedWriteChannel out; // buffered FileChannel for writing
   private final Checksum checksum;
+  private final Supplier<byte[]> sharedBuffer;
 
   private final long segmentMaxSize;
   private final long preallocatedSize;
@@ -59,10 +61,17 @@ public class SegmentedRaftLogOutputStream implements 
Closeable {
   public SegmentedRaftLogOutputStream(File file, boolean append, long 
segmentMaxSize,
       long preallocatedSize, ByteBuffer byteBuffer)
       throws IOException {
+    this(file, append, segmentMaxSize, preallocatedSize, byteBuffer, null);
+  }
+
+  SegmentedRaftLogOutputStream(File file, boolean append, long segmentMaxSize,
+      long preallocatedSize, ByteBuffer byteBuffer, Supplier<byte[]> 
sharedBuffer)
+      throws IOException {
     this.file = file;
     this.checksum = new PureJavaCrc32C();
     this.segmentMaxSize = segmentMaxSize;
     this.preallocatedSize = preallocatedSize;
+    this.sharedBuffer = sharedBuffer;
     this.out = BufferedWriteChannel.open(file, append, byteBuffer);
 
     if (!append) {
@@ -75,12 +84,12 @@ public class SegmentedRaftLogOutputStream implements 
Closeable {
 
   /**
    * Write the given entry to this output stream.
-   *
+   * <p>
    * Format:
    *   (1) The serialized size of the entry.
    *   (2) The entry.
    *   (3) 4-byte checksum of the entry.
-   *
+   * <p>
    * Size in bytes to be written:
    *   (size to encode n) + n + (checksum size),
    *   where n is the entry serialized size and the checksum size is 4.
@@ -88,8 +97,10 @@ public class SegmentedRaftLogOutputStream implements 
Closeable {
   public void write(LogEntryProto entry) throws IOException {
     final int serialized = entry.getSerializedSize();
     final int proto = CodedOutputStream.computeUInt32SizeNoTag(serialized) + 
serialized;
-    final byte[] buf = new byte[proto + 4]; // proto and 4-byte checksum
-    preallocateIfNecessary(buf.length);
+    final int total = proto + 4; // proto and 4-byte checksum
+    final byte[] buf = sharedBuffer != null? sharedBuffer.get(): new 
byte[total];
+    Preconditions.assertTrue(total <= buf.length, () -> "total = " + total + " 
> buf.length " + buf.length);
+    preallocateIfNecessary(total);
 
     CodedOutputStream cout = CodedOutputStream.newInstance(buf);
     cout.writeUInt32NoTag(serialized);
@@ -99,7 +110,7 @@ public class SegmentedRaftLogOutputStream implements 
Closeable {
     checksum.update(buf, 0, proto);
     ByteBuffer.wrap(buf, proto, 4).putInt((int) checksum.getValue());
 
-    out.write(buf);
+    out.write(buf, total);
   }
 
   @Override
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index 516b01c7b..7bc673a79 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -154,6 +154,7 @@ class SegmentedRaftLogWorker {
   private final Timer raftLogEnqueueingDelayTimer;
   private final SegmentedRaftLogMetrics raftLogMetrics;
   private final ByteBuffer writeBuffer;
+  private final Supplier<byte[]> sharedBuffer;
 
   /**
    * The number of entries that have been written into the 
SegmentedRaftLogOutputStream but
@@ -217,6 +218,9 @@ class SegmentedRaftLogWorker {
 
     final int bufferSize = 
RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt();
     this.writeBuffer = ByteBuffer.allocateDirect(bufferSize);
+    final int logEntryLimit = 
RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties).getSizeInt();
+    // 4 bytes (serialized size) + logEntryLimit + 4 bytes (checksum)
+    this.sharedBuffer = MemoizedSupplier.valueOf(() -> new byte[logEntryLimit 
+ 8]);
     this.unsafeFlush = RaftServerConfigKeys.Log.unsafeFlushEnabled(properties);
     this.asyncFlush = RaftServerConfigKeys.Log.asyncFlushEnabled(properties);
     if (asyncFlush && unsafeFlush) {
@@ -750,6 +754,6 @@ class SegmentedRaftLogWorker {
   private void allocateSegmentedRaftLogOutputStream(File file, boolean append) 
throws IOException {
     Preconditions.assertTrue(out == null && writeBuffer.position() == 0);
     out = new SegmentedRaftLogOutputStream(file, append, segmentMaxSize,
-            preallocatedSize, writeBuffer);
+            preallocatedSize, writeBuffer, sharedBuffer);
   }
 }

Reply via email to