This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 918c8d81e RATIS-1743. Memory leak in SegmentedRaftLogWorker due to 
metrics. (#784)
918c8d81e is described below

commit 918c8d81ea25bd31cc96374feb26e90cb57d0f61
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Tue Nov 15 20:12:37 2022 -0800

    RATIS-1743. Memory leak in SegmentedRaftLogWorker due to metrics. (#784)
    
    (cherry picked from commit 44b6ef6bed0f57c42f7af64b7d391f4eb3f88cc7)
---
 .../ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index cdeaf4fdd..dad94db93 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -18,7 +18,6 @@
 package org.apache.ratis.server.raftlog.segmented;
 
 import org.apache.ratis.metrics.Timekeeper;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
 import org.apache.ratis.protocol.ClientInvocationId;
@@ -50,6 +49,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Queue;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 
@@ -149,7 +149,7 @@ class SegmentedRaftLogWorker {
   private final StateMachine stateMachine;
   private final SegmentedRaftLogMetrics raftLogMetrics;
   private final ByteBuffer writeBuffer;
-  private final Supplier<byte[]> sharedBuffer;
+  private final AtomicReference<byte[]> sharedBuffer;
 
   /**
    * The number of entries that have been written into the 
SegmentedRaftLogOutputStream but
@@ -210,7 +210,7 @@ class SegmentedRaftLogWorker {
     this.writeBuffer = ByteBuffer.allocateDirect(bufferSize);
     final int logEntryLimit = 
RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties).getSizeInt();
     // 4 bytes (serialized size) + logEntryLimit + 4 bytes (checksum)
-    this.sharedBuffer = MemoizedSupplier.valueOf(() -> new byte[logEntryLimit 
+ 8]);
+    this.sharedBuffer = new AtomicReference<>(new byte[logEntryLimit + 8]);
     this.unsafeFlush = RaftServerConfigKeys.Log.unsafeFlushEnabled(properties);
     this.asyncFlush = RaftServerConfigKeys.Log.asyncFlushEnabled(properties);
     if (asyncFlush && unsafeFlush) {
@@ -235,6 +235,7 @@ class SegmentedRaftLogWorker {
 
   void close() {
     this.running = false;
+    sharedBuffer.set(null);
     workerThread.interrupt();
     Optional.ofNullable(flushExecutor).ifPresent(ExecutorService::shutdown);
     try {
@@ -355,7 +356,6 @@ class SegmentedRaftLogWorker {
     return pendingFlushNum > 0 && queue.isEmpty();
   }
 
-  @SuppressFBWarnings("NP_NULL_PARAM_DEREF")
   private void flushIfNecessary() throws IOException {
     if (shouldFlush()) {
       raftLogMetrics.onRaftLogFlush();
@@ -733,6 +733,6 @@ class SegmentedRaftLogWorker {
   private void allocateSegmentedRaftLogOutputStream(File file, boolean append) 
throws IOException {
     Preconditions.assertTrue(out == null && writeBuffer.position() == 0);
     out = new SegmentedRaftLogOutputStream(file, append, segmentMaxSize,
-            preallocatedSize, writeBuffer, sharedBuffer);
+        preallocatedSize, writeBuffer, sharedBuffer::get);
   }
 }

Reply via email to