This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch branch-2_tmp in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 8247d3c6b637d6155c76a03dbb8bd9504cacbdb3 Author: Tsz-Wo Nicholas Sze <[email protected]> AuthorDate: Tue Feb 28 05:00:02 2023 -0800 RATIS-1793. Enforce raft.server.log.appender.wait-time.min. (#832) (cherry picked from commit fdd7c5c98beaeed6aa33261752554575282e1a43) --- .../apache/ratis/grpc/server/GrpcLogAppender.java | 30 +++++++++++----------- .../apache/ratis/server/leader/FollowerInfo.java | 3 +++ .../apache/ratis/server/leader/LogAppender.java | 4 +-- .../apache/ratis/server/raftlog/RaftLogIndex.java | 30 ++++++++++++++++------ .../apache/ratis/server/impl/FollowerInfoImpl.java | 14 +++++++--- .../ratis/server/leader/LogAppenderBase.java | 11 ++++++-- 6 files changed, 62 insertions(+), 30 deletions(-) diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java index 7c742d91d..b76da8ecb 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java @@ -48,7 +48,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; import com.codahale.metrics.Timer; @@ -74,9 +73,6 @@ public class GrpcLogAppender extends LogAppenderBase { private final TimeDuration requestTimeoutDuration; private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance(); - private final long waitTimeMinMs; - private final AtomicReference<Timestamp> lastAppendEntries; - private volatile StreamObservers appendLogRequestObserver; private final boolean useSeparateHBChannel; @@ -96,10 +92,6 @@ public class GrpcLogAppender extends LogAppenderBase { this.installSnapshotEnabled = RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(properties); this.useSeparateHBChannel = GrpcConfigKeys.Server.heartbeatChannel(properties); - final TimeDuration waitTimeMin = RaftServerConfigKeys.Log.Appender.waitTimeMin(properties); - this.waitTimeMinMs = waitTimeMin.toLong(TimeUnit.MILLISECONDS); - this.lastAppendEntries = new AtomicReference<>(Timestamp.currentTime().addTime(waitTimeMin.negate())); - grpcServerMetrics = new GrpcServerMetrics(server.getMemberId().toString()); grpcServerMetrics.addPendingRequestsCount(getFollowerId().toString(), pendingRequests::logRequestsSize); @@ -182,10 +174,9 @@ public class GrpcLogAppender extends LogAppenderBase { // For normal nodes, new entries should be sent ASAP // however for slow followers (especially when the follower is down), // keep sending without any wait time only ends up in high CPU load - final long min = waitTimeMinMs - lastAppendEntries.get().elapsedTimeMs(); - return Math.max(0L, min); + return Math.max(getMinWaitTimeMs(), 0L); } - return Math.min(waitTimeMinMs, getHeartbeatWaitTimeMs()); + return Math.min(getMinWaitTimeMs(), getHeartbeatWaitTimeMs()); } private boolean isSlowFollower() { @@ -263,13 +254,13 @@ public class GrpcLogAppender extends LogAppenderBase { return CALL_ID_COMPARATOR; } - private void appendLog(boolean excludeLogEntries) throws IOException { + private void appendLog(boolean heartbeat) throws IOException { final AppendEntriesRequestProto pending; final AppendEntriesRequest request; try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) { // Prepare and send the append request. // Note changes on follower's nextIndex and ops on pendingRequests should always be done under the write-lock - pending = newAppendEntriesRequest(callId.getAndIncrement(), excludeLogEntries); + pending = newAppendEntriesRequest(callId.getAndIncrement(), heartbeat); if (pending == null) { return; } @@ -282,6 +273,16 @@ public class GrpcLogAppender extends LogAppenderBase { } } + final long waitMs = getMinWaitTimeMs(); + if (waitMs > 0) { + try { + Thread.sleep(waitMs); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw IOUtils.toInterruptedIOException( + "Interrupted appendLog, heartbeat? " + heartbeat, e); + } + } if (isRunning()) { sendRequest(request, pending); } @@ -295,15 +296,14 @@ public class GrpcLogAppender extends LogAppenderBase { final boolean sent = Optional.ofNullable(appendLogRequestObserver) .map(observer -> { observer.onNext(proto); - lastAppendEntries.set(Timestamp.currentTime()); return true; }).isPresent(); if (sent) { + getFollower().updateLastRpcSendTime(request.isHeartbeat()); scheduler.onTimeout(requestTimeoutDuration, () -> timeoutAppendRequest(request.getCallId(), request.isHeartbeat()), LOG, () -> "Timeout check failed for append entry request: " + request); - getFollower().updateLastRpcSendTime(request.isHeartbeat()); } else { request.stopRequestTimer(); } diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java index b4ae8458c..fb63068a5 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java @@ -78,6 +78,9 @@ public interface FollowerInfo { /** @return the lastRpcResponseTime . */ Timestamp getLastRpcResponseTime(); + /** @return the lastRpcSendTime . */ + Timestamp getLastRpcSendTime(); + /** Update lastRpcResponseTime to the current time. */ void updateLastRpcResponseTime(); diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java index f0ff28690..49a1a12fa 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java @@ -166,8 +166,8 @@ public interface LogAppender { return getFollower().getNextIndex() < getRaftLog().getNextIndex(); } - /** send a heartbeat AppendEntries immediately */ - void triggerHeartbeat() throws IOException; + /** Trigger to send a heartbeat AppendEntries. */ + void triggerHeartbeat(); /** @return the wait time in milliseconds to send the next heartbeat. */ default long getHeartbeatWaitTimeMs() { diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogIndex.java b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogIndex.java index ed545e291..290a58835 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogIndex.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogIndex.java @@ -44,30 +44,44 @@ public class RaftLogIndex { public boolean setUnconditionally(long newIndex, Consumer<Object> log) { final long old = index.getAndSet(newIndex); - log.accept(StringUtils.stringSupplierAsObject(() -> name + ": setUnconditionally " + old + " -> " + newIndex)); - return old != newIndex; + final boolean updated = old != newIndex; + if (updated) { + log.accept(StringUtils.stringSupplierAsObject( + () -> name + ": setUnconditionally " + old + " -> " + newIndex)); + } + return updated; } public boolean updateUnconditionally(LongUnaryOperator update, Consumer<Object> log) { final long old = index.getAndUpdate(update); final long newIndex = update.applyAsLong(old); - log.accept(StringUtils.stringSupplierAsObject(() -> name + ": updateUnconditionally " + old + " -> " + newIndex)); - return old != newIndex; + final boolean updated = old != newIndex; + if (updated) { + log.accept(StringUtils.stringSupplierAsObject( + () -> name + ": updateUnconditionally " + old + " -> " + newIndex)); + } + return updated; } public boolean updateIncreasingly(long newIndex, Consumer<Object> log) { final long old = index.getAndSet(newIndex); Preconditions.assertTrue(old <= newIndex, () -> "Failed to updateIncreasingly for " + name + ": " + old + " -> " + newIndex); - log.accept(StringUtils.stringSupplierAsObject(() -> name + ": updateIncreasingly " + old + " -> " + newIndex)); - return old != newIndex; + final boolean updated = old != newIndex; + if (updated) { + log.accept(StringUtils.stringSupplierAsObject( + () -> name + ": updateIncreasingly " + old + " -> " + newIndex)); + } + return updated; } public boolean updateToMax(long newIndex, Consumer<Object> log) { final long old = index.getAndUpdate(oldIndex -> Math.max(oldIndex, newIndex)); final boolean updated = old < newIndex; - log.accept(StringUtils.stringSupplierAsObject( - () -> name + ": updateToMax old=" + old + ", new=" + newIndex + ", updated? " + updated)); + if (updated) { + log.accept(StringUtils.stringSupplierAsObject( + () -> name + ": updateToMax old=" + old + ", new=" + newIndex + ", updated? " + updated)); + } return updated; } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java index 0d7fe2075..67af642fd 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java @@ -93,17 +93,20 @@ class FollowerInfoImpl implements FollowerInfo { @Override public void decreaseNextIndex(long newNextIndex) { - nextIndex.updateUnconditionally(old -> old <= 0L? old: Math.min(old - 1, newNextIndex), infoIndexChange); + nextIndex.updateUnconditionally(old -> old <= 0L? old: Math.min(old - 1, newNextIndex), + message -> infoIndexChange.accept("decreaseNextIndex " + message)); } @Override public void setNextIndex(long newNextIndex) { - nextIndex.updateUnconditionally(old -> newNextIndex >= 0 ? newNextIndex : old, infoIndexChange); + nextIndex.updateUnconditionally(old -> newNextIndex >= 0 ? newNextIndex : old, + message -> infoIndexChange.accept("setNextIndex " + message)); } @Override public void updateNextIndex(long newNextIndex) { - nextIndex.updateToMax(newNextIndex, infoIndexChange); + nextIndex.updateToMax(newNextIndex, + message -> infoIndexChange.accept("decreaseNextIndex " + message)); } @Override @@ -160,6 +163,11 @@ class FollowerInfoImpl implements FollowerInfo { return lastRpcResponseTime.get(); } + @Override + public Timestamp getLastRpcSendTime() { + return lastRpcSendTime.get(); + } + @Override public void updateLastRpcSendTime(boolean isHeartbeat) { final Timestamp currentTime = Timestamp.currentTime(); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java index bc8a31181..1c0f61836 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java @@ -34,10 +34,10 @@ import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.SizeInBytes; -import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -56,6 +56,7 @@ public abstract class LogAppenderBase implements LogAppender { private final AwaitForSignal eventAwaitForSignal; private final AtomicBoolean heartbeatTrigger = new AtomicBoolean(); + private final long waitTimeMinMs; protected LogAppenderBase(RaftServer.Division server, LeaderState leaderState, FollowerInfo f) { this.follower = f; @@ -71,10 +72,12 @@ public abstract class LogAppenderBase implements LogAppender { this.buffer = new DataQueue<>(this, bufferByteLimit, bufferElementLimit, EntryWithData::getSerializedSize); this.daemon = new LogAppenderDaemon(this); this.eventAwaitForSignal = new AwaitForSignal(name); + + this.waitTimeMinMs = RaftServerConfigKeys.Log.Appender.waitTimeMin(properties).toLong(TimeUnit.MILLISECONDS); } @Override - public void triggerHeartbeat() throws IOException { + public void triggerHeartbeat() { if (heartbeatTrigger.compareAndSet(false, true)) { notifyLogAppender(); } @@ -133,6 +136,10 @@ public abstract class LogAppenderBase implements LogAppender { getLeaderState().restart(this); } + public long getMinWaitTimeMs() { + return waitTimeMinMs - getFollower().getLastRpcSendTime().elapsedTimeMs(); + } + @Override public final FollowerInfo getFollower() { return follower;
