This is an automated email from the ASF dual-hosted git repository. dragonyliu pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 343ed28ae5ae1272beebf5be98224e1103e56db6 Author: William Song <[email protected]> AuthorDate: Sat Aug 27 00:58:21 2022 +0800 RATIS-1671. Add manual trigger snapshot (#712) (cherry picked from commit eaf9541af05af06e9e0077572c8544c4485b1503) --- .../apache/ratis/grpc/server/GrpcLogAppender.java | 9 +++++--- .../apache/ratis/server/leader/LogAppender.java | 3 +++ .../ratis/server/leader/LogAppenderBase.java | 26 ++++++++++++++++++++++ .../ratis/server/leader/LogAppenderDefault.java | 1 + 4 files changed, 36 insertions(+), 3 deletions(-) diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java index d9bcb33d1..e87edac5e 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java @@ -281,9 +281,12 @@ public class GrpcLogAppender extends LogAppenderBase { CodeInjectionForTesting.execute(GrpcService.GRPC_SEND_SERVER_REQUEST, getServer().getId(), null, proto); request.startRequestTimer(); - boolean sent = Optional.ofNullable(appendLogRequestObserver).map(observer -> { - observer.onNext(proto); - return true;}).isPresent(); + resetHeartbeatTrigger(); + final boolean sent = Optional.ofNullable(appendLogRequestObserver) + .map(observer -> { + observer.onNext(proto); + return true; + }).isPresent(); if (sent) { scheduler.onTimeout(requestTimeoutDuration, diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java index ef5e1a7ed..f0ff28690 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java @@ -166,6 +166,9 @@ public interface LogAppender { return getFollower().getNextIndex() < getRaftLog().getNextIndex(); } + /** send a heartbeat AppendEntries immediately */ + void triggerHeartbeat() throws IOException; + /** @return the wait time in milliseconds to send the next heartbeat. */ default long getHeartbeatWaitTimeMs() { final int min = getServer().properties().minRpcTimeoutMs(); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java index 50f9887fd..fda78fbcf 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java @@ -34,9 +34,11 @@ import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.SizeInBytes; +import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; /** * An abstract implementation of {@link LogAppender}. @@ -53,6 +55,8 @@ public abstract class LogAppenderBase implements LogAppender { private final LogAppenderDaemon daemon; private final AwaitForSignal eventAwaitForSignal; + private final AtomicBoolean heartbeatTrigger = new AtomicBoolean(); + protected LogAppenderBase(RaftServer.Division server, LeaderState leaderState, FollowerInfo f) { this.follower = f; this.name = follower.getName() + "-" + JavaUtils.getClassSimpleName(getClass()); @@ -69,6 +73,28 @@ public abstract class LogAppenderBase implements LogAppender { this.eventAwaitForSignal = new AwaitForSignal(name); } + @Override + public void triggerHeartbeat() throws IOException { + if (heartbeatTrigger.compareAndSet(false, true)) { + notifyLogAppender(); + } + } + + protected void resetHeartbeatTrigger() { + heartbeatTrigger.set(false); + } + + @Override + public boolean shouldSendAppendEntries() { + return heartbeatTrigger.get() || LogAppender.super.shouldSendAppendEntries(); + } + + @Override + public long getHeartbeatWaitTimeMs() { + return heartbeatTrigger.get() ? 0 : + LogAppender.super.getHeartbeatWaitTimeMs(); + } + @Override public AwaitForSignal getEventAwaitForSignal() { return eventAwaitForSignal; diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java index 0c91427e5..0a4c12ce7 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java @@ -72,6 +72,7 @@ class LogAppenderDefault extends LogAppenderBase { return null; } + resetHeartbeatTrigger(); getFollower().updateLastRpcSendTime(request.getEntriesCount() == 0); final AppendEntriesReplyProto r = getServerRpc().appendEntries(request); getFollower().updateLastRpcResponseTime();
