This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch snapshot-3 in repository https://gitbox.apache.org/repos/asf/ratis.git
commit dd24cb0630a0ebf8cccb40aa77762167b0e1f550 Author: Duong Nguyen <[email protected]> AuthorDate: Fri Jun 21 09:46:36 2024 -0700 RATIS-2114. Corruption due to SegmentedRaftLogWorker queue LogEntry without reference counter (#1113) --- .../ratis/server/raftlog/segmented/SegmentedRaftLog.java | 3 +++ .../server/raftlog/segmented/SegmentedRaftLogWorker.java | 15 +++++++++++++++ 2 files changed, 18 insertions(+) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java index def472a60..3467efeb3 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java @@ -101,6 +101,9 @@ public final class SegmentedRaftLog extends RaftLogBase { completeFuture(); } + void discard() { + } + final void completeFuture() { final boolean completed = future.complete(getEndIndex()); Preconditions.assertTrue(completed, diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java index bfeca7f87..6da3e0045 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java @@ -284,6 +284,7 @@ class SegmentedRaftLogWorker { LOG.error("Failed to add IO task {}", task, e); Optional.ofNullable(server).ifPresent(RaftServer.Division::close); } + task.discard(); } task.startTimerOnEnqueue(raftLogMetrics.getEnqueuedTimer()); return task; @@ -488,6 +489,7 @@ class SegmentedRaftLogWorker { private final LogEntryProto entry; private final CompletableFuture<?> stateMachineFuture; private final CompletableFuture<Long> combined; + private final ReferenceCountedObject<LogEntryProto> ref; WriteLog(LogEntryProto entry, LogEntryProto removedStateMachineData, TransactionContext context) { this.entry = removedStateMachineData; @@ -501,6 +503,8 @@ class SegmentedRaftLogWorker { } else { this.stateMachineFuture = null; } + entryRef.retain(); + this.ref = entryRef; } else { try { // this.entry != entry iff the entry has state machine data @@ -510,6 +514,7 @@ class SegmentedRaftLogWorker { + ", entry=" + LogProtoUtils.toLogEntryString(entry, stateMachine::toStateMachineLogEntryString), e); throw e; } + this.ref = null; } this.combined = stateMachineFuture == null? super.getFuture() : super.getFuture().thenCombine(stateMachineFuture, (index, stateMachineResult) -> index); @@ -534,6 +539,16 @@ class SegmentedRaftLogWorker { @Override void done() { writeTasks.offerOrCompleteFuture(this); + if (ref != null) { + ref.release(); + } + } + + @Override + void discard() { + if (ref != null) { + ref.release(); + } } @Override
