This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f04919ff RATIS-2077. Timedout StateMachine retainRead is released 
twice (#1081)
7f04919ff is described below

commit 7f04919ffe99a24ea4cb226f27f7c97ba58c2417
Author: Duong Nguyen <[email protected]>
AuthorDate: Sat May 11 14:13:34 2024 -0700

    RATIS-2077. Timedout StateMachine retainRead is released twice (#1081)
---
 .../ratis/server/leader/LogAppenderBase.java       | 25 ++++++++++++++++------
 .../apache/ratis/server/raftlog/RaftLogBase.java   | 11 ----------
 2 files changed, 18 insertions(+), 18 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
index de221432b..f4c332f99 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
@@ -42,6 +42,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -271,14 +272,24 @@ public abstract class LogAppenderBase implements 
LogAppender {
       return null;
     }
 
-    final List<LogEntryProto> protos = 
buffer.pollList(getHeartbeatWaitTimeMs(), EntryWithData::getEntry,
-        (entry, time, exception) -> LOG.warn("Failed to get " + entry
-            + " in " + time.toString(TimeUnit.MILLISECONDS, 3), exception));
-    for (EntryWithData entry : buffer) {
-      // Release remaining entries.
-      offered.remove(entry.getIndex()).release();
+    final List<LogEntryProto> protos;
+    try {
+      protos = buffer.pollList(getHeartbeatWaitTimeMs(), 
EntryWithData::getEntry,
+          (entry, time, exception) -> LOG.warn("Failed to get {} in {}",
+              entry, time.toString(TimeUnit.MILLISECONDS, 3), exception));
+    } catch (RaftLogIOException e) {
+      for (ReferenceCountedObject<EntryWithData> ref : offered.values()) {
+        ref.release();
+      }
+      offered.clear();
+      throw e;
+    } finally {
+      for (EntryWithData entry : buffer) {
+        // Release remaining entries.
+        
Optional.ofNullable(offered.remove(entry.getIndex())).ifPresent(ReferenceCountedObject::release);
+      }
+      buffer.clear();
     }
-    buffer.clear();
     assertProtos(protos, followerNext, previous, snapshotIndex);
     AppendEntriesRequestProto appendEntriesProto =
         leaderState.newAppendEntriesRequestProto(follower, protos, previous, 
callId);
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
index 3b9724cc6..064e509fd 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
@@ -500,13 +500,11 @@ public abstract class RaftLogBase implements RaftLog {
         if (timeout.compareTo(stateMachineDataReadTimeout) > 0) {
           getRaftLogMetrics().onStateMachineDataReadTimeout();
         }
-        discardData();
         throw t;
       } catch (Exception e) {
         if (e instanceof InterruptedException) {
           Thread.currentThread().interrupt();
         }
-        discardData();
         final String err = getName() + ": Failed readStateMachineData for " + 
this;
         LOG.error(err, e);
         throw new RaftLogIOException(err, 
JavaUtils.unwrapCompletionException(e));
@@ -516,20 +514,11 @@ public abstract class RaftLogBase implements RaftLog {
       if (LogProtoUtils.isStateMachineDataEmpty(entryProto)) {
         final String err = getName() + ": State machine data not set for " + 
this;
         LOG.error(err);
-        data.release();
         throw new RaftLogIOException(err);
       }
       return entryProto;
     }
 
-    private void discardData() {
-      future.whenComplete((r, ex) -> {
-        if (r != null) {
-          r.release();
-        }
-      });
-    }
-
     @Override
     public String toString() {
       return toLogEntryString(logEntry);

Reply via email to