This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch snapshot-3
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit dd24cb0630a0ebf8cccb40aa77762167b0e1f550
Author: Duong Nguyen <[email protected]>
AuthorDate: Fri Jun 21 09:46:36 2024 -0700

    RATIS-2114. Corruption due to SegmentedRaftLogWorker queue LogEntry without 
reference counter (#1113)
---
 .../ratis/server/raftlog/segmented/SegmentedRaftLog.java  |  3 +++
 .../server/raftlog/segmented/SegmentedRaftLogWorker.java  | 15 +++++++++++++++
 2 files changed, 18 insertions(+)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index def472a60..3467efeb3 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -101,6 +101,9 @@ public final class SegmentedRaftLog extends RaftLogBase {
       completeFuture();
     }
 
+    void discard() {
+    }
+
     final void completeFuture() {
       final boolean completed = future.complete(getEndIndex());
       Preconditions.assertTrue(completed,
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index bfeca7f87..6da3e0045 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -284,6 +284,7 @@ class SegmentedRaftLogWorker {
         LOG.error("Failed to add IO task {}", task, e);
         Optional.ofNullable(server).ifPresent(RaftServer.Division::close);
       }
+      task.discard();
     }
     task.startTimerOnEnqueue(raftLogMetrics.getEnqueuedTimer());
     return task;
@@ -488,6 +489,7 @@ class SegmentedRaftLogWorker {
     private final LogEntryProto entry;
     private final CompletableFuture<?> stateMachineFuture;
     private final CompletableFuture<Long> combined;
+    private final ReferenceCountedObject<LogEntryProto> ref;
 
     WriteLog(LogEntryProto entry, LogEntryProto removedStateMachineData, 
TransactionContext context) {
       this.entry = removedStateMachineData;
@@ -501,6 +503,8 @@ class SegmentedRaftLogWorker {
         } else {
           this.stateMachineFuture = null;
         }
+        entryRef.retain();
+        this.ref = entryRef;
       } else {
         try {
           // this.entry != entry iff the entry has state machine data
@@ -510,6 +514,7 @@ class SegmentedRaftLogWorker {
               + ", entry=" + LogProtoUtils.toLogEntryString(entry, 
stateMachine::toStateMachineLogEntryString), e);
           throw e;
         }
+        this.ref = null;
       }
       this.combined = stateMachineFuture == null? super.getFuture()
           : super.getFuture().thenCombine(stateMachineFuture, (index, 
stateMachineResult) -> index);
@@ -534,6 +539,16 @@ class SegmentedRaftLogWorker {
     @Override
     void done() {
       writeTasks.offerOrCompleteFuture(this);
+      if (ref != null) {
+        ref.release();
+      }
+    }
+
+    @Override
+    void discard() {
+      if (ref != null) {
+        ref.release();
+      }
     }
 
     @Override

Reply via email to