This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 34fa62ce7 RATIS-2114. Corruption due to SegmentedRaftLogWorker queue
LogEntry without reference counter (#1113)
34fa62ce7 is described below
commit 34fa62ce7a59decf3fc8671f730f100e9bfe3ef6
Author: Duong Nguyen <[email protected]>
AuthorDate: Fri Jun 21 09:46:36 2024 -0700
RATIS-2114. Corruption due to SegmentedRaftLogWorker queue LogEntry without
reference counter (#1113)
---
.../ratis/server/raftlog/segmented/SegmentedRaftLog.java | 3 +++
.../server/raftlog/segmented/SegmentedRaftLogWorker.java | 15 +++++++++++++++
2 files changed, 18 insertions(+)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index 27b771ff1..79f0380ee 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -101,6 +101,9 @@ public final class SegmentedRaftLog extends RaftLogBase {
completeFuture();
}
+ void discard() {
+ }
+
final void completeFuture() {
final boolean completed = future.complete(getEndIndex());
Preconditions.assertTrue(completed,
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index cd15ea5ef..b9d1442a0 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -284,6 +284,7 @@ class SegmentedRaftLogWorker {
LOG.error("Failed to add IO task {}", task, e);
Optional.ofNullable(server).ifPresent(RaftServer.Division::close);
}
+ task.discard();
}
task.startTimerOnEnqueue(raftLogMetrics.getEnqueuedTimer());
return task;
@@ -489,6 +490,7 @@ class SegmentedRaftLogWorker {
private final LogEntryProto entry;
private final CompletableFuture<?> stateMachineFuture;
private final CompletableFuture<Long> combined;
+ private final ReferenceCountedObject<LogEntryProto> ref;
WriteLog(ReferenceCountedObject<LogEntryProto> entryRef, LogEntryProto
removedStateMachineData,
TransactionContext context) {
@@ -505,6 +507,8 @@ class SegmentedRaftLogWorker {
} else {
this.stateMachineFuture = null;
}
+ entryRef.retain();
+ this.ref = entryRef;
} else {
try {
// this.entry != origEntry if it has state machine data
@@ -514,6 +518,7 @@ class SegmentedRaftLogWorker {
+ ", entry=" + LogProtoUtils.toLogEntryString(origEntry,
stateMachine::toStateMachineLogEntryString), e);
throw e;
}
+ this.ref = null;
}
this.combined = stateMachineFuture == null? super.getFuture()
: super.getFuture().thenCombine(stateMachineFuture, (index,
stateMachineResult) -> index);
@@ -538,6 +543,16 @@ class SegmentedRaftLogWorker {
@Override
void done() {
writeTasks.offerOrCompleteFuture(this);
+ if (ref != null) {
+ ref.release();
+ }
+ }
+
+ @Override
+ void discard() {
+ if (ref != null) {
+ ref.release();
+ }
}
@Override