This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new b210965db RATIS-2184. Improve TestRaftWithGrpc test stability (#1177)
b210965db is described below

commit b210965db17cc255a0d97d4e3d7450145063425a
Author: jianghuazhu <[email protected]>
AuthorDate: Tue Dec 24 03:57:55 2024 +0800

    RATIS-2184. Improve TestRaftWithGrpc test stability (#1177)
---
 .../org/apache/ratis/grpc/server/GrpcLogAppender.java   | 13 +++++++++++--
 .../org/apache/ratis/server/leader/LogAppenderBase.java | 17 ++++++++++++-----
 .../ratis/server/raftlog/segmented/LogSegment.java      | 14 +++++++++-----
 .../org/apache/ratis/server/impl/MiniRaftCluster.java   | 14 ++++++--------
 4 files changed, 38 insertions(+), 20 deletions(-)

diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 0784eaf04..1544975a4 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -302,8 +302,14 @@ public class GrpcLogAppender extends LogAppenderBase {
 
   @Override
   public CompletableFuture<LifeCycle.State> stopAsync() {
-    grpcServerMetrics.unregister();
-    return super.stopAsync();
+    try (AutoCloseableLock ignored = lock.writeLock(caller, LOG::trace)) {
+      if (appendLogRequestObserver != null) {
+        appendLogRequestObserver.stop();
+        appendLogRequestObserver = null;
+      }
+      grpcServerMetrics.unregister();
+      return super.stopAsync();
+    }
   }
 
   @Override
@@ -382,6 +388,9 @@ public class GrpcLogAppender extends LogAppenderBase {
     final ReferenceCountedObject<AppendEntriesRequestProto> pending;
     final AppendEntriesRequest request;
     try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) {
+      if (!isRunning()) {
+        return;
+      }
       // Prepare and send the append request.
       // Note changes on follower's nextIndex and ops on pendingRequests 
should always be done under the write-lock
       pending = nextAppendEntriesRequest(callId.getAndIncrement(), heartbeat);
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
index d1557588d..e6c92ce04 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
@@ -261,12 +261,19 @@ public abstract class LogAppenderBase implements 
LogAppender {
     final long halfMs = heartbeatWaitTimeMs/2;
     final Map<Long, ReferenceCountedObject<EntryWithData>> offered = new 
HashMap<>();
     for (long next = followerNext; leaderNext > next && 
getHeartbeatWaitTimeMs() - halfMs > 0; next++) {
-      final ReferenceCountedObject<EntryWithData> entryWithData = 
getRaftLog().retainEntryWithData(next);
-      if (!buffer.offer(entryWithData.get())) {
-        entryWithData.release();
-        break;
+      ReferenceCountedObject<EntryWithData> entryWithData = null;
+      try {
+        entryWithData = getRaftLog().retainEntryWithData(next);
+        if (!buffer.offer(entryWithData.get())) {
+          entryWithData.release();
+          break;
+        }
+        offered.put(next, entryWithData);
+      } catch (Exception e){
+        if (entryWithData != null) {
+          entryWithData.release();
+        }
       }
-      offered.put(next, entryWithData);
     }
     if (buffer.isEmpty()) {
       return null;
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
index 2eea0f90f..ef066baac 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
@@ -282,11 +282,15 @@ public final class LogSegment {
       final LogSegmentStartEnd startEnd = 
LogSegmentStartEnd.valueOf(startIndex, endIndex, isOpen);
       readSegmentFile(file, startEnd, maxOpSize, getLogCorruptionPolicy(), 
raftLogMetrics, entryRef -> {
         final LogEntryProto entry = entryRef.retain();
-        final TermIndex ti = TermIndex.valueOf(entry);
-        putEntryCache(ti, entryRef, Op.LOAD_SEGMENT_FILE);
-        if (ti.equals(key.getTermIndex())) {
-          toReturn.set(entryRef);
-        } else {
+        try {
+          final TermIndex ti = TermIndex.valueOf(entry);
+          putEntryCache(ti, entryRef, Op.LOAD_SEGMENT_FILE);
+          if (ti.equals(key.getTermIndex())) {
+            toReturn.set(entryRef);
+          } else {
+            entryRef.release();
+          }
+        } catch (Exception e) {
           entryRef.release();
         }
       });
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
index 86ebfa52c..e180f3ecb 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
@@ -859,12 +859,11 @@ public abstract class MiniRaftCluster implements 
Closeable {
     getServers().forEach(proxy -> executor.submit(() -> 
JavaUtils.runAsUnchecked(proxy::close)));
     final int maxRetries = 30;
     final TimeDuration retrySleep = TimeDuration.ONE_SECOND;
-    try {
-      executor.shutdown();
-      // just wait for a few seconds
-      boolean terminated = false;
+    executor.shutdown();
+    boolean terminated = false;
 
-      for(int i = 0; i < maxRetries && !terminated; ) {
+    for(int i = 0; i < maxRetries && !terminated; ) {
+      try {
         terminated = executor.awaitTermination(retrySleep.getDuration(), 
retrySleep.getUnit());
         if (!terminated) {
           i++;
@@ -874,10 +873,9 @@ public abstract class MiniRaftCluster implements Closeable 
{
             LOG.error("Failed to shutdown executor, some servers may be still 
running:\n{}", printServers());
           }
         }
-      }
-    } catch (InterruptedException e) {
+      } catch (InterruptedException e) {
       LOG.warn("shutdown interrupted", e);
-      Thread.currentThread().interrupt();
+      }
     }
 
     Optional.ofNullable(timer.get()).ifPresent(Timer::cancel);

Reply via email to