This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 7f04919ff RATIS-2077. Timedout StateMachine retainRead is released
twice (#1081)
7f04919ff is described below
commit 7f04919ffe99a24ea4cb226f27f7c97ba58c2417
Author: Duong Nguyen <[email protected]>
AuthorDate: Sat May 11 14:13:34 2024 -0700
RATIS-2077. Timedout StateMachine retainRead is released twice (#1081)
---
.../ratis/server/leader/LogAppenderBase.java | 25 ++++++++++++++++------
.../apache/ratis/server/raftlog/RaftLogBase.java | 11 ----------
2 files changed, 18 insertions(+), 18 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
index de221432b..f4c332f99 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
@@ -42,6 +42,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -271,14 +272,24 @@ public abstract class LogAppenderBase implements
LogAppender {
return null;
}
- final List<LogEntryProto> protos =
buffer.pollList(getHeartbeatWaitTimeMs(), EntryWithData::getEntry,
- (entry, time, exception) -> LOG.warn("Failed to get " + entry
- + " in " + time.toString(TimeUnit.MILLISECONDS, 3), exception));
- for (EntryWithData entry : buffer) {
- // Release remaining entries.
- offered.remove(entry.getIndex()).release();
+ final List<LogEntryProto> protos;
+ try {
+ protos = buffer.pollList(getHeartbeatWaitTimeMs(),
EntryWithData::getEntry,
+ (entry, time, exception) -> LOG.warn("Failed to get {} in {}",
+ entry, time.toString(TimeUnit.MILLISECONDS, 3), exception));
+ } catch (RaftLogIOException e) {
+ for (ReferenceCountedObject<EntryWithData> ref : offered.values()) {
+ ref.release();
+ }
+ offered.clear();
+ throw e;
+ } finally {
+ for (EntryWithData entry : buffer) {
+ // Release remaining entries.
+
Optional.ofNullable(offered.remove(entry.getIndex())).ifPresent(ReferenceCountedObject::release);
+ }
+ buffer.clear();
}
- buffer.clear();
assertProtos(protos, followerNext, previous, snapshotIndex);
AppendEntriesRequestProto appendEntriesProto =
leaderState.newAppendEntriesRequestProto(follower, protos, previous,
callId);
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
index 3b9724cc6..064e509fd 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
@@ -500,13 +500,11 @@ public abstract class RaftLogBase implements RaftLog {
if (timeout.compareTo(stateMachineDataReadTimeout) > 0) {
getRaftLogMetrics().onStateMachineDataReadTimeout();
}
- discardData();
throw t;
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
- discardData();
final String err = getName() + ": Failed readStateMachineData for " +
this;
LOG.error(err, e);
throw new RaftLogIOException(err,
JavaUtils.unwrapCompletionException(e));
@@ -516,20 +514,11 @@ public abstract class RaftLogBase implements RaftLog {
if (LogProtoUtils.isStateMachineDataEmpty(entryProto)) {
final String err = getName() + ": State machine data not set for " +
this;
LOG.error(err);
- data.release();
throw new RaftLogIOException(err);
}
return entryProto;
}
- private void discardData() {
- future.whenComplete((r, ex) -> {
- if (r != null) {
- r.release();
- }
- });
- }
-
@Override
public String toString() {
return toLogEntryString(logEntry);