This is an automated email from the ASF dual-hosted git repository.
williamsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new fdd7c5c98 RATIS-1793. Enforce raft.server.log.appender.wait-time.min.
(#832)
fdd7c5c98 is described below
commit fdd7c5c98beaeed6aa33261752554575282e1a43
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Tue Feb 28 05:00:02 2023 -0800
RATIS-1793. Enforce raft.server.log.appender.wait-time.min. (#832)
---
.../apache/ratis/grpc/server/GrpcLogAppender.java | 30 +++++++++++-----------
.../apache/ratis/server/leader/FollowerInfo.java | 3 +++
.../apache/ratis/server/leader/LogAppender.java | 4 +--
.../apache/ratis/server/raftlog/RaftLogIndex.java | 30 ++++++++++++++++------
.../apache/ratis/server/impl/FollowerInfoImpl.java | 14 +++++++---
.../apache/ratis/server/impl/LeaderStateImpl.java | 8 +-----
.../ratis/server/leader/LogAppenderBase.java | 11 ++++++--
7 files changed, 63 insertions(+), 37 deletions(-)
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index b9b88f471..a61726177 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -48,7 +48,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
/**
* A new log appender implementation using grpc bi-directional stream API.
@@ -72,9 +71,6 @@ public class GrpcLogAppender extends LogAppenderBase {
private final TimeDuration requestTimeoutDuration;
private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
- private final long waitTimeMinMs;
- private final AtomicReference<Timestamp> lastAppendEntries;
-
private volatile StreamObservers appendLogRequestObserver;
private final boolean useSeparateHBChannel;
@@ -94,10 +90,6 @@ public class GrpcLogAppender extends LogAppenderBase {
this.installSnapshotEnabled =
RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(properties);
this.useSeparateHBChannel =
GrpcConfigKeys.Server.heartbeatChannel(properties);
- final TimeDuration waitTimeMin =
RaftServerConfigKeys.Log.Appender.waitTimeMin(properties);
- this.waitTimeMinMs = waitTimeMin.toLong(TimeUnit.MILLISECONDS);
- this.lastAppendEntries = new
AtomicReference<>(Timestamp.currentTime().addTime(waitTimeMin.negate()));
-
grpcServerMetrics = new GrpcServerMetrics(server.getMemberId().toString());
grpcServerMetrics.addPendingRequestsCount(getFollowerId().toString(),
pendingRequests::logRequestsSize);
@@ -180,10 +172,9 @@ public class GrpcLogAppender extends LogAppenderBase {
// For normal nodes, new entries should be sent ASAP
// however for slow followers (especially when the follower is down),
// keep sending without any wait time only ends up in high CPU load
- final long min = waitTimeMinMs - lastAppendEntries.get().elapsedTimeMs();
- return Math.max(0L, min);
+ return Math.max(getMinWaitTimeMs(), 0L);
}
- return Math.min(waitTimeMinMs, getHeartbeatWaitTimeMs());
+ return Math.min(getMinWaitTimeMs(), getHeartbeatWaitTimeMs());
}
private boolean isSlowFollower() {
@@ -261,13 +252,13 @@ public class GrpcLogAppender extends LogAppenderBase {
return CALL_ID_COMPARATOR;
}
- private void appendLog(boolean excludeLogEntries) throws IOException {
+ private void appendLog(boolean heartbeat) throws IOException {
final AppendEntriesRequestProto pending;
final AppendEntriesRequest request;
try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) {
// Prepare and send the append request.
// Note changes on follower's nextIndex and ops on pendingRequests
should always be done under the write-lock
- pending = newAppendEntriesRequest(callId.getAndIncrement(),
excludeLogEntries);
+ pending = newAppendEntriesRequest(callId.getAndIncrement(), heartbeat);
if (pending == null) {
return;
}
@@ -280,6 +271,16 @@ public class GrpcLogAppender extends LogAppenderBase {
}
}
+ final long waitMs = getMinWaitTimeMs();
+ if (waitMs > 0) {
+ try {
+ Thread.sleep(waitMs);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw IOUtils.toInterruptedIOException(
+ "Interrupted appendLog, heartbeat? " + heartbeat, e);
+ }
+ }
if (isRunning()) {
sendRequest(request, pending);
}
@@ -293,15 +294,14 @@ public class GrpcLogAppender extends LogAppenderBase {
.map(observer -> {
request.startRequestTimer();
observer.onNext(proto);
- lastAppendEntries.set(Timestamp.currentTime());
return true;
}).isPresent();
if (sent) {
+ getFollower().updateLastRpcSendTime(request.isHeartbeat());
scheduler.onTimeout(requestTimeoutDuration,
() -> timeoutAppendRequest(request.getCallId(),
request.isHeartbeat()),
LOG, () -> "Timeout check failed for append entry request: " +
request);
- getFollower().updateLastRpcSendTime(request.isHeartbeat());
}
}
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
index 7f7e4662c..1dd4066e8 100644
---
a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
+++
b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
@@ -87,6 +87,9 @@ public interface FollowerInfo {
/** @return the lastRpcResponseTime . */
Timestamp getLastRpcResponseTime();
+ /** @return the lastRpcSendTime . */
+ Timestamp getLastRpcSendTime();
+
/** Update lastRpcResponseTime to the current time. */
void updateLastRpcResponseTime();
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
index 20b18882b..342c48528 100644
---
a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
+++
b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
@@ -166,8 +166,8 @@ public interface LogAppender {
return getFollower().getNextIndex() < getRaftLog().getNextIndex();
}
- /** send a heartbeat AppendEntries immediately */
- void triggerHeartbeat() throws IOException;
+ /** Trigger to send a heartbeat AppendEntries. */
+ void triggerHeartbeat();
/** @return the wait time in milliseconds to send the next heartbeat. */
default long getHeartbeatWaitTimeMs() {
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogIndex.java
b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogIndex.java
index ed545e291..290a58835 100644
---
a/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogIndex.java
+++
b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogIndex.java
@@ -44,30 +44,44 @@ public class RaftLogIndex {
public boolean setUnconditionally(long newIndex, Consumer<Object> log) {
final long old = index.getAndSet(newIndex);
- log.accept(StringUtils.stringSupplierAsObject(() -> name + ":
setUnconditionally " + old + " -> " + newIndex));
- return old != newIndex;
+ final boolean updated = old != newIndex;
+ if (updated) {
+ log.accept(StringUtils.stringSupplierAsObject(
+ () -> name + ": setUnconditionally " + old + " -> " + newIndex));
+ }
+ return updated;
}
public boolean updateUnconditionally(LongUnaryOperator update,
Consumer<Object> log) {
final long old = index.getAndUpdate(update);
final long newIndex = update.applyAsLong(old);
- log.accept(StringUtils.stringSupplierAsObject(() -> name + ":
updateUnconditionally " + old + " -> " + newIndex));
- return old != newIndex;
+ final boolean updated = old != newIndex;
+ if (updated) {
+ log.accept(StringUtils.stringSupplierAsObject(
+ () -> name + ": updateUnconditionally " + old + " -> " + newIndex));
+ }
+ return updated;
}
public boolean updateIncreasingly(long newIndex, Consumer<Object> log) {
final long old = index.getAndSet(newIndex);
Preconditions.assertTrue(old <= newIndex,
() -> "Failed to updateIncreasingly for " + name + ": " + old + " -> "
+ newIndex);
- log.accept(StringUtils.stringSupplierAsObject(() -> name + ":
updateIncreasingly " + old + " -> " + newIndex));
- return old != newIndex;
+ final boolean updated = old != newIndex;
+ if (updated) {
+ log.accept(StringUtils.stringSupplierAsObject(
+ () -> name + ": updateIncreasingly " + old + " -> " + newIndex));
+ }
+ return updated;
}
public boolean updateToMax(long newIndex, Consumer<Object> log) {
final long old = index.getAndUpdate(oldIndex -> Math.max(oldIndex,
newIndex));
final boolean updated = old < newIndex;
- log.accept(StringUtils.stringSupplierAsObject(
- () -> name + ": updateToMax old=" + old + ", new=" + newIndex + ",
updated? " + updated));
+ if (updated) {
+ log.accept(StringUtils.stringSupplierAsObject(
+ () -> name + ": updateToMax old=" + old + ", new=" + newIndex + ",
updated? " + updated));
+ }
return updated;
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
index c1cb7962c..891a01c76 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
@@ -98,17 +98,20 @@ class FollowerInfoImpl implements FollowerInfo {
@Override
public void decreaseNextIndex(long newNextIndex) {
- nextIndex.updateUnconditionally(old -> old <= 0L? old: Math.min(old - 1,
newNextIndex), infoIndexChange);
+ nextIndex.updateUnconditionally(old -> old <= 0L? old: Math.min(old - 1,
newNextIndex),
+ message -> infoIndexChange.accept("decreaseNextIndex " + message));
}
@Override
public void setNextIndex(long newNextIndex) {
- nextIndex.updateUnconditionally(old -> newNextIndex >= 0 ? newNextIndex :
old, infoIndexChange);
+ nextIndex.updateUnconditionally(old -> newNextIndex >= 0 ? newNextIndex :
old,
+ message -> infoIndexChange.accept("setNextIndex " + message));
}
@Override
public void updateNextIndex(long newNextIndex) {
- nextIndex.updateToMax(newNextIndex, infoIndexChange);
+ nextIndex.updateToMax(newNextIndex,
+ message -> infoIndexChange.accept("decreaseNextIndex " + message));
}
@Override
@@ -176,6 +179,11 @@ class FollowerInfoImpl implements FollowerInfo {
return lastRpcResponseTime.get();
}
+ @Override
+ public Timestamp getLastRpcSendTime() {
+ return lastRpcSendTime.get();
+ }
+
@Override
public void updateLastRpcSendTime(boolean isHeartbeat) {
final Timestamp currentTime = Timestamp.currentTime();
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index af4ba0ada..694183bcf 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -1140,13 +1140,7 @@ class LeaderStateImpl implements LeaderState {
}
if (supplier.isInitialized()) {
- senders.forEach(sender -> {
- try {
- sender.triggerHeartbeat();
- } catch (IOException e) {
- LOG.warn("{}: {} cannot trigger heartbeat due to {}", this, sender,
e);
- }
- });
+ senders.forEach(LogAppender::triggerHeartbeat);
}
return listener.getFuture();
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
index bc8a31181..1c0f61836 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
@@ -34,10 +34,10 @@ import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.SizeInBytes;
-import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -56,6 +56,7 @@ public abstract class LogAppenderBase implements LogAppender {
private final AwaitForSignal eventAwaitForSignal;
private final AtomicBoolean heartbeatTrigger = new AtomicBoolean();
+ private final long waitTimeMinMs;
protected LogAppenderBase(RaftServer.Division server, LeaderState
leaderState, FollowerInfo f) {
this.follower = f;
@@ -71,10 +72,12 @@ public abstract class LogAppenderBase implements
LogAppender {
this.buffer = new DataQueue<>(this, bufferByteLimit, bufferElementLimit,
EntryWithData::getSerializedSize);
this.daemon = new LogAppenderDaemon(this);
this.eventAwaitForSignal = new AwaitForSignal(name);
+
+ this.waitTimeMinMs =
RaftServerConfigKeys.Log.Appender.waitTimeMin(properties).toLong(TimeUnit.MILLISECONDS);
}
@Override
- public void triggerHeartbeat() throws IOException {
+ public void triggerHeartbeat() {
if (heartbeatTrigger.compareAndSet(false, true)) {
notifyLogAppender();
}
@@ -133,6 +136,10 @@ public abstract class LogAppenderBase implements
LogAppender {
getLeaderState().restart(this);
}
+ public long getMinWaitTimeMs() {
+ return waitTimeMinMs - getFollower().getLastRpcSendTime().elapsedTimeMs();
+ }
+
@Override
public final FollowerInfo getFollower() {
return follower;