This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 34fa62ce7 RATIS-2114. Corruption due to SegmentedRaftLogWorker queue 
LogEntry without reference counter (#1113)
34fa62ce7 is described below

commit 34fa62ce7a59decf3fc8671f730f100e9bfe3ef6
Author: Duong Nguyen <[email protected]>
AuthorDate: Fri Jun 21 09:46:36 2024 -0700

    RATIS-2114. Corruption due to SegmentedRaftLogWorker queue LogEntry without 
reference counter (#1113)
---
 .../ratis/server/raftlog/segmented/SegmentedRaftLog.java  |  3 +++
 .../server/raftlog/segmented/SegmentedRaftLogWorker.java  | 15 +++++++++++++++
 2 files changed, 18 insertions(+)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index 27b771ff1..79f0380ee 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -101,6 +101,9 @@ public final class SegmentedRaftLog extends RaftLogBase {
       completeFuture();
     }
 
+    void discard() {
+    }
+
     final void completeFuture() {
       final boolean completed = future.complete(getEndIndex());
       Preconditions.assertTrue(completed,
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index cd15ea5ef..b9d1442a0 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -284,6 +284,7 @@ class SegmentedRaftLogWorker {
         LOG.error("Failed to add IO task {}", task, e);
         Optional.ofNullable(server).ifPresent(RaftServer.Division::close);
       }
+      task.discard();
     }
     task.startTimerOnEnqueue(raftLogMetrics.getEnqueuedTimer());
     return task;
@@ -489,6 +490,7 @@ class SegmentedRaftLogWorker {
     private final LogEntryProto entry;
     private final CompletableFuture<?> stateMachineFuture;
     private final CompletableFuture<Long> combined;
+    private final ReferenceCountedObject<LogEntryProto> ref;
 
     WriteLog(ReferenceCountedObject<LogEntryProto> entryRef, LogEntryProto 
removedStateMachineData,
         TransactionContext context) {
@@ -505,6 +507,8 @@ class SegmentedRaftLogWorker {
         } else {
           this.stateMachineFuture = null;
         }
+        entryRef.retain();
+        this.ref = entryRef;
       } else {
         try {
           // this.entry != origEntry if it has state machine data
@@ -514,6 +518,7 @@ class SegmentedRaftLogWorker {
               + ", entry=" + LogProtoUtils.toLogEntryString(origEntry, 
stateMachine::toStateMachineLogEntryString), e);
           throw e;
         }
+        this.ref = null;
       }
       this.combined = stateMachineFuture == null? super.getFuture()
           : super.getFuture().thenCombine(stateMachineFuture, (index, 
stateMachineResult) -> index);
@@ -538,6 +543,16 @@ class SegmentedRaftLogWorker {
     @Override
     void done() {
       writeTasks.offerOrCompleteFuture(this);
+      if (ref != null) {
+        ref.release();
+      }
+    }
+
+    @Override
+    void discard() {
+      if (ref != null) {
+        ref.release();
+      }
     }
 
     @Override

Reply via email to