This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new b210965db RATIS-2184. Improve TestRaftWithGrpc test stability (#1177)
b210965db is described below
commit b210965db17cc255a0d97d4e3d7450145063425a
Author: jianghuazhu <[email protected]>
AuthorDate: Tue Dec 24 03:57:55 2024 +0800
RATIS-2184. Improve TestRaftWithGrpc test stability (#1177)
---
.../org/apache/ratis/grpc/server/GrpcLogAppender.java | 13 +++++++++++--
.../org/apache/ratis/server/leader/LogAppenderBase.java | 17 ++++++++++++-----
.../ratis/server/raftlog/segmented/LogSegment.java | 14 +++++++++-----
.../org/apache/ratis/server/impl/MiniRaftCluster.java | 14 ++++++--------
4 files changed, 38 insertions(+), 20 deletions(-)
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 0784eaf04..1544975a4 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -302,8 +302,14 @@ public class GrpcLogAppender extends LogAppenderBase {
@Override
public CompletableFuture<LifeCycle.State> stopAsync() {
- grpcServerMetrics.unregister();
- return super.stopAsync();
+ try (AutoCloseableLock ignored = lock.writeLock(caller, LOG::trace)) {
+ if (appendLogRequestObserver != null) {
+ appendLogRequestObserver.stop();
+ appendLogRequestObserver = null;
+ }
+ grpcServerMetrics.unregister();
+ return super.stopAsync();
+ }
}
@Override
@@ -382,6 +388,9 @@ public class GrpcLogAppender extends LogAppenderBase {
final ReferenceCountedObject<AppendEntriesRequestProto> pending;
final AppendEntriesRequest request;
try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) {
+ if (!isRunning()) {
+ return;
+ }
// Prepare and send the append request.
// Note changes on follower's nextIndex and ops on pendingRequests
should always be done under the write-lock
pending = nextAppendEntriesRequest(callId.getAndIncrement(), heartbeat);
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
index d1557588d..e6c92ce04 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
@@ -261,12 +261,19 @@ public abstract class LogAppenderBase implements
LogAppender {
final long halfMs = heartbeatWaitTimeMs/2;
final Map<Long, ReferenceCountedObject<EntryWithData>> offered = new
HashMap<>();
for (long next = followerNext; leaderNext > next &&
getHeartbeatWaitTimeMs() - halfMs > 0; next++) {
- final ReferenceCountedObject<EntryWithData> entryWithData =
getRaftLog().retainEntryWithData(next);
- if (!buffer.offer(entryWithData.get())) {
- entryWithData.release();
- break;
+ ReferenceCountedObject<EntryWithData> entryWithData = null;
+ try {
+ entryWithData = getRaftLog().retainEntryWithData(next);
+ if (!buffer.offer(entryWithData.get())) {
+ entryWithData.release();
+ break;
+ }
+ offered.put(next, entryWithData);
+ } catch (Exception e){
+ if (entryWithData != null) {
+ entryWithData.release();
+ }
}
- offered.put(next, entryWithData);
}
if (buffer.isEmpty()) {
return null;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
index 2eea0f90f..ef066baac 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
@@ -282,11 +282,15 @@ public final class LogSegment {
final LogSegmentStartEnd startEnd =
LogSegmentStartEnd.valueOf(startIndex, endIndex, isOpen);
readSegmentFile(file, startEnd, maxOpSize, getLogCorruptionPolicy(),
raftLogMetrics, entryRef -> {
final LogEntryProto entry = entryRef.retain();
- final TermIndex ti = TermIndex.valueOf(entry);
- putEntryCache(ti, entryRef, Op.LOAD_SEGMENT_FILE);
- if (ti.equals(key.getTermIndex())) {
- toReturn.set(entryRef);
- } else {
+ try {
+ final TermIndex ti = TermIndex.valueOf(entry);
+ putEntryCache(ti, entryRef, Op.LOAD_SEGMENT_FILE);
+ if (ti.equals(key.getTermIndex())) {
+ toReturn.set(entryRef);
+ } else {
+ entryRef.release();
+ }
+ } catch (Exception e) {
entryRef.release();
}
});
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
index 86ebfa52c..e180f3ecb 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
@@ -859,12 +859,11 @@ public abstract class MiniRaftCluster implements
Closeable {
getServers().forEach(proxy -> executor.submit(() ->
JavaUtils.runAsUnchecked(proxy::close)));
final int maxRetries = 30;
final TimeDuration retrySleep = TimeDuration.ONE_SECOND;
- try {
- executor.shutdown();
- // just wait for a few seconds
- boolean terminated = false;
+ executor.shutdown();
+ boolean terminated = false;
- for(int i = 0; i < maxRetries && !terminated; ) {
+ for(int i = 0; i < maxRetries && !terminated; ) {
+ try {
terminated = executor.awaitTermination(retrySleep.getDuration(),
retrySleep.getUnit());
if (!terminated) {
i++;
@@ -874,10 +873,9 @@ public abstract class MiniRaftCluster implements Closeable
{
LOG.error("Failed to shutdown executor, some servers may be still
running:\n{}", printServers());
}
}
- }
- } catch (InterruptedException e) {
+ } catch (InterruptedException e) {
LOG.warn("shutdown interrupted", e);
- Thread.currentThread().interrupt();
+ }
}
Optional.ofNullable(timer.get()).ifPresent(Timer::cancel);