This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 44b6ef6be RATIS-1743. Memory leak in SegmentedRaftLogWorker due to
metrics. (#784)
44b6ef6be is described below
commit 44b6ef6bed0f57c42f7af64b7d391f4eb3f88cc7
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Tue Nov 15 20:12:37 2022 -0800
RATIS-1743. Memory leak in SegmentedRaftLogWorker due to metrics. (#784)
---
.../ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index 7bc673a79..7e49522fb 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -19,7 +19,6 @@ package org.apache.ratis.server.raftlog.segmented;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Timer;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
import org.apache.ratis.protocol.ClientInvocationId;
@@ -51,6 +50,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
@@ -154,7 +154,7 @@ class SegmentedRaftLogWorker {
private final Timer raftLogEnqueueingDelayTimer;
private final SegmentedRaftLogMetrics raftLogMetrics;
private final ByteBuffer writeBuffer;
- private final Supplier<byte[]> sharedBuffer;
+ private final AtomicReference<byte[]> sharedBuffer;
/**
* The number of entries that have been written into the
SegmentedRaftLogOutputStream but
@@ -220,7 +220,7 @@ class SegmentedRaftLogWorker {
this.writeBuffer = ByteBuffer.allocateDirect(bufferSize);
final int logEntryLimit =
RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties).getSizeInt();
// 4 bytes (serialized size) + logEntryLimit + 4 bytes (checksum)
- this.sharedBuffer = MemoizedSupplier.valueOf(() -> new byte[logEntryLimit
+ 8]);
+ this.sharedBuffer = new AtomicReference<>(new byte[logEntryLimit + 8]);
this.unsafeFlush = RaftServerConfigKeys.Log.unsafeFlushEnabled(properties);
this.asyncFlush = RaftServerConfigKeys.Log.asyncFlushEnabled(properties);
if (asyncFlush && unsafeFlush) {
@@ -245,6 +245,7 @@ class SegmentedRaftLogWorker {
void close() {
this.running = false;
+ sharedBuffer.set(null);
workerThread.interrupt();
Optional.ofNullable(flushExecutor).ifPresent(ExecutorService::shutdown);
try {
@@ -368,7 +369,6 @@ class SegmentedRaftLogWorker {
return pendingFlushNum > 0 && queue.isEmpty();
}
- @SuppressFBWarnings("NP_NULL_PARAM_DEREF")
private void flushIfNecessary() throws IOException {
if (shouldFlush()) {
raftLogMetrics.onRaftLogFlush();
@@ -754,6 +754,6 @@ class SegmentedRaftLogWorker {
private void allocateSegmentedRaftLogOutputStream(File file, boolean append)
throws IOException {
Preconditions.assertTrue(out == null && writeBuffer.position() == 0);
out = new SegmentedRaftLogOutputStream(file, append, segmentMaxSize,
- preallocatedSize, writeBuffer, sharedBuffer);
+ preallocatedSize, writeBuffer, sharedBuffer::get);
}
}