This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new fe1cbbba8 RATIS-1886. AppendLog sleep fixed time cause significant
drop in write throughput. (#929)
fe1cbbba8 is described below
commit fe1cbbba8893323448544deb9368bd714a210529
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Oct 4 08:32:06 2023 -0700
RATIS-1886. AppendLog sleep fixed time cause significant drop in write
throughput. (#929)
---
.../java/org/apache/ratis/util/TimeDuration.java | 20 ++++++++-
.../apache/ratis/grpc/server/GrpcLogAppender.java | 22 +++++-----
.../apache/ratis/server/RaftServerConfigKeys.java | 2 +-
.../apache/ratis/server/impl/FollowerInfoImpl.java | 47 +++++++++++++++-------
.../apache/ratis/server/impl/LeaderStateImpl.java | 2 +-
.../ratis/server/leader/LogAppenderBase.java | 16 +++++---
.../org/apache/ratis/util/TestTimeDuration.java | 35 ++++++++++++----
7 files changed, 105 insertions(+), 39 deletions(-)
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
index 10c5c6bd1..4d1fe4ef0 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
@@ -68,6 +68,14 @@ public final class TimeDuration implements
Comparable<TimeDuration> {
() -> new IllegalStateException("Failed to compute min(" + left + ", "
+ right + ")"));
}
+ /** @return the maximum of the given parameters. */
+ public static TimeDuration max(TimeDuration left, TimeDuration right) {
+ Objects.requireNonNull(left, "left == null");
+ Objects.requireNonNull(right, "right == null");
+ return Stream.of(left, right).max(TimeDuration::compareTo).orElseThrow(
+ () -> new IllegalStateException("Failed to compute max(" + left + ", "
+ right + ")"));
+ }
+
/** Abbreviations of {@link TimeUnit}. */
public enum Abbreviation {
NANOSECONDS("ns", "nanos"),
@@ -304,11 +312,21 @@ public final class TimeDuration implements
Comparable<TimeDuration> {
return function.apply(getDuration(), getUnit());
}
- /** @return Is this {@link TimeDuration} negative? */
+ /** @return Is this {@link TimeDuration} less than zero? */
public boolean isNegative() {
return duration < 0;
}
+ /** @return Is this {@link TimeDuration} greater than or equal to zero? */
+ public boolean isNonNegative() {
+ return duration >= 0;
+ }
+
+ /** @return Is this {@link TimeDuration} greater than zero? */
+ public boolean isPositive() {
+ return duration > 0;
+ }
+
/** @return Is this {@link TimeDuration} less than or equal to zero? */
public boolean isNonPositive() {
return duration <= 0;
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 07aecf6fe..422adf499 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -192,7 +192,7 @@ public class GrpcLogAppender extends LogAppenderBase {
// For normal nodes, new entries should be sent ASAP
// however for slow followers (especially when the follower is down),
// keep sending without any wait time only ends up in high CPU load
- return Math.max(getMinWaitTimeMs(), 0L);
+ return TimeDuration.max(getRemainingWaitTime(),
TimeDuration.ZERO).toLong(TimeUnit.MILLISECONDS);
}
return getHeartbeatWaitTimeMs();
}
@@ -246,14 +246,16 @@ public class GrpcLogAppender extends LogAppenderBase {
}
static class StreamObservers {
- public static final int DEFAULT_WAIT_FOR_READY_MS = 10;
private final CallStreamObserver<AppendEntriesRequestProto> appendLog;
private final CallStreamObserver<AppendEntriesRequestProto> heartbeat;
+ private final TimeDuration waitForReady;
private volatile boolean running = true;
- StreamObservers(GrpcServerProtocolClient client, AppendLogResponseHandler
handler, boolean separateHeartbeat) {
+ StreamObservers(GrpcServerProtocolClient client, AppendLogResponseHandler
handler, boolean separateHeartbeat,
+ TimeDuration waitTimeMin) {
this.appendLog = client.appendEntries(handler, false);
this.heartbeat = separateHeartbeat? client.appendEntries(handler, true):
null;
+ this.waitForReady = waitTimeMin.isPositive()? waitTimeMin:
TimeDuration.ONE_MILLISECOND;
}
void onNext(AppendEntriesRequestProto proto)
@@ -267,7 +269,7 @@ public class GrpcLogAppender extends LogAppenderBase {
}
// stall for stream to be ready.
while (!stream.isReady() && running) {
- sleep(DEFAULT_WAIT_FOR_READY_MS, isHeartBeat);
+ sleep(waitForReady, isHeartBeat);
}
stream.onNext(proto);
}
@@ -307,23 +309,23 @@ public class GrpcLogAppender extends LogAppenderBase {
increaseNextIndex(pending);
if (appendLogRequestObserver == null) {
appendLogRequestObserver = new StreamObservers(
- getClient(), new AppendLogResponseHandler(), useSeparateHBChannel);
+ getClient(), new AppendLogResponseHandler(), useSeparateHBChannel,
getWaitTimeMin());
}
}
- final long waitMs = getMinWaitTimeMs();
- if (waitMs > 0) {
- sleep(waitMs, heartbeat);
+ final TimeDuration remaining = getRemainingWaitTime();
+ if (remaining.isPositive()) {
+ sleep(remaining, heartbeat);
}
if (isRunning()) {
sendRequest(request, pending);
}
}
- private static void sleep(long waitMs, boolean heartbeat)
+ private static void sleep(TimeDuration waitTime, boolean heartbeat)
throws InterruptedIOException {
try {
- Thread.sleep(waitMs);
+ waitTime.sleep();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw IOUtils.toInterruptedIOException(
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index cd38e5667..eb367a982 100644
---
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -584,7 +584,7 @@ public interface RaftServerConfigKeys {
}
String WAIT_TIME_MIN_KEY = PREFIX + ".wait-time.min";
- TimeDuration WAIT_TIME_MIN_DEFAULT = TimeDuration.valueOf(10,
TimeUnit.MILLISECONDS);
+ TimeDuration WAIT_TIME_MIN_DEFAULT = TimeDuration.ONE_MILLISECOND;
static TimeDuration waitTimeMin(RaftProperties properties) {
return
getTimeDuration(properties.getTimeDuration(WAIT_TIME_MIN_DEFAULT.getUnit()),
WAIT_TIME_MIN_KEY, WAIT_TIME_MIN_DEFAULT, getDefaultLog());
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
index 91ab90a20..9394398ac 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
@@ -26,13 +26,10 @@ import org.apache.ratis.server.raftlog.RaftLogIndex;
import org.apache.ratis.util.Timestamp;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Consumer;
import java.util.function.Function;
class FollowerInfoImpl implements FollowerInfo {
private final String name;
- private final Consumer<Object> infoIndexChange;
- private final Consumer<Object> debugIndexChange;
private final AtomicReference<RaftPeer> peer;
private final Function<RaftPeerId, RaftPeer> getPeer;
@@ -50,8 +47,6 @@ class FollowerInfoImpl implements FollowerInfo {
FollowerInfoImpl(RaftGroupMemberId id, RaftPeer peer, Function<RaftPeerId,
RaftPeer> getPeer,
Timestamp lastRpcTime, long nextIndex, boolean caughtUp) {
this.name = id + "->" + peer.getId();
- this.infoIndexChange = s -> LOG.info("{}: {}", name, s);
- this.debugIndexChange = s -> LOG.debug("{}: {}", name, s);
this.peer = new AtomicReference<>(peer);
this.getPeer = getPeer;
@@ -63,6 +58,30 @@ class FollowerInfoImpl implements FollowerInfo {
this.caughtUp = caughtUp;
}
+ private void info(Object message) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("{}: {}", name, message);
+ }
+ }
+
+ private void info(String prefix, Object message) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("{}: {} {}", name, prefix, message);
+ }
+ }
+
+ private void debug(Object message) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{}: {}", name, message);
+ }
+ }
+
+ private void debug(String prefix, Object message) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{}: {} {}", name, prefix, message);
+ }
+ }
+
@Override
public long getMatchIndex() {
return matchIndex.get();
@@ -70,7 +89,7 @@ class FollowerInfoImpl implements FollowerInfo {
@Override
public boolean updateMatchIndex(long newMatchIndex) {
- return matchIndex.updateToMax(newMatchIndex, debugIndexChange);
+ return matchIndex.updateToMax(newMatchIndex, this::debug);
}
@Override
@@ -80,7 +99,7 @@ class FollowerInfoImpl implements FollowerInfo {
@Override
public boolean updateCommitIndex(long newCommitIndex) {
- return commitIndex.updateToMax(newCommitIndex, debugIndexChange);
+ return commitIndex.updateToMax(newCommitIndex, this::debug);
}
@Override
@@ -95,32 +114,32 @@ class FollowerInfoImpl implements FollowerInfo {
@Override
public void increaseNextIndex(long newNextIndex) {
- nextIndex.updateIncreasingly(newNextIndex, debugIndexChange);
+ nextIndex.updateIncreasingly(newNextIndex, this::debug);
}
@Override
public void decreaseNextIndex(long newNextIndex) {
nextIndex.updateUnconditionally(old -> old <= 0L? old: Math.min(old - 1,
newNextIndex),
- message -> infoIndexChange.accept("decreaseNextIndex " + message));
+ message -> info("decreaseNextIndex", message));
}
@Override
public void setNextIndex(long newNextIndex) {
nextIndex.updateUnconditionally(old -> newNextIndex >= 0 ? newNextIndex :
old,
- message -> infoIndexChange.accept("setNextIndex " + message));
+ message -> info("setNextIndex", message));
}
@Override
public void updateNextIndex(long newNextIndex) {
nextIndex.updateToMax(newNextIndex,
- message -> infoIndexChange.accept("decreaseNextIndex " + message));
+ message -> debug("updateNextIndex", message));
}
@Override
public void setSnapshotIndex(long newSnapshotIndex) {
- snapshotIndex.setUnconditionally(newSnapshotIndex, infoIndexChange);
- matchIndex.setUnconditionally(newSnapshotIndex, infoIndexChange);
- nextIndex.setUnconditionally(newSnapshotIndex + 1, infoIndexChange);
+ snapshotIndex.setUnconditionally(newSnapshotIndex, this::info);
+ matchIndex.setUnconditionally(newSnapshotIndex, this::info);
+ nextIndex.setUnconditionally(newSnapshotIndex + 1, this::info);
}
@Override
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 4ebfc3d56..44468fb53 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -1049,7 +1049,7 @@ class LeaderStateImpl implements LeaderState {
*/
public boolean checkLeadership() {
- if (!server.getInfo().isLeader()) {
+ if (!server.getRole().getLeaderState().filter(leader -> leader ==
this).isPresent()) {
return false;
}
// The initial value of lastRpcResponseTime in FollowerInfo is set by
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
index 1c0f61836..d57c46a34 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
@@ -33,6 +33,7 @@ import org.apache.ratis.util.DataQueue;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.SizeInBytes;
+import org.apache.ratis.util.TimeDuration;
import java.util.Collections;
import java.util.List;
@@ -56,7 +57,7 @@ public abstract class LogAppenderBase implements LogAppender {
private final AwaitForSignal eventAwaitForSignal;
private final AtomicBoolean heartbeatTrigger = new AtomicBoolean();
- private final long waitTimeMinMs;
+ private final TimeDuration waitTimeMin;
protected LogAppenderBase(RaftServer.Division server, LeaderState
leaderState, FollowerInfo f) {
this.follower = f;
@@ -73,7 +74,7 @@ public abstract class LogAppenderBase implements LogAppender {
this.daemon = new LogAppenderDaemon(this);
this.eventAwaitForSignal = new AwaitForSignal(name);
- this.waitTimeMinMs =
RaftServerConfigKeys.Log.Appender.waitTimeMin(properties).toLong(TimeUnit.MILLISECONDS);
+ this.waitTimeMin =
RaftServerConfigKeys.Log.Appender.waitTimeMin(properties);
}
@Override
@@ -136,8 +137,12 @@ public abstract class LogAppenderBase implements
LogAppender {
getLeaderState().restart(this);
}
- public long getMinWaitTimeMs() {
- return waitTimeMinMs - getFollower().getLastRpcSendTime().elapsedTimeMs();
+ protected TimeDuration getWaitTimeMin() {
+ return waitTimeMin;
+ }
+
+ protected TimeDuration getRemainingWaitTime() {
+ return
waitTimeMin.add(getFollower().getLastRpcSendTime().elapsedTime().negate());
}
@Override
@@ -203,7 +208,8 @@ public abstract class LogAppenderBase implements
LogAppender {
}
final List<LogEntryProto> protos =
buffer.pollList(getHeartbeatWaitTimeMs(), EntryWithData::getEntry,
- (entry, time, exception) -> LOG.warn("Failed to get {} in {}: {}",
entry, time, exception));
+ (entry, time, exception) -> LOG.warn("Failed to get " + entry
+ + " in " + time.toString(TimeUnit.MILLISECONDS, 3), exception));
buffer.clear();
assertProtos(protos, followerNext, previous, snapshotIndex);
return leaderState.newAppendEntriesRequestProto(follower, protos,
previous, callId);
diff --git
a/ratis-test/src/test/java/org/apache/ratis/util/TestTimeDuration.java
b/ratis-test/src/test/java/org/apache/ratis/util/TestTimeDuration.java
index 2301a1073..a2c180d92 100644
--- a/ratis-test/src/test/java/org/apache/ratis/util/TestTimeDuration.java
+++ b/ratis-test/src/test/java/org/apache/ratis/util/TestTimeDuration.java
@@ -27,9 +27,9 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.apache.ratis.util.TimeDuration.Abbreviation;
-import static org.apache.ratis.util.TimeDuration.LOG;
import static org.apache.ratis.util.TimeDuration.parse;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -249,14 +249,35 @@ public class TestTimeDuration {
@Test(timeout = 1000)
public void testCompareTo() {
- assertTrue(TimeDuration.ONE_SECOND.compareTo(TimeDuration.ONE_MINUTE) < 0);
- assertTrue(TimeDuration.ONE_MINUTE.compareTo(TimeDuration.ONE_SECOND) > 0);
+ assertTimeDurationCompareTo(TimeDuration.ONE_MINUTE,
TimeDuration.ONE_SECOND);
- assertTrue(TimeDuration.valueOf(15,
TimeUnit.SECONDS).compareTo(TimeDuration.ONE_MINUTE) < 0);
- assertTrue(TimeDuration.ONE_MINUTE.compareTo(TimeDuration.valueOf(15,
TimeUnit.SECONDS)) > 0);
+ final TimeDuration fifteenSecond = TimeDuration.valueOf(15,
TimeUnit.SECONDS);
+ assertTimeDurationCompareTo(TimeDuration.ONE_DAY, fifteenSecond);
- assertEquals(0, TimeDuration.valueOf(60,
TimeUnit.SECONDS).compareTo(TimeDuration.ONE_MINUTE));
- assertEquals(0, TimeDuration.ONE_MINUTE.compareTo(TimeDuration.valueOf(60,
TimeUnit.SECONDS)));
+ assertTimeDurationEquals(TimeDuration.ONE_MINUTE,
fifteenSecond.multiply(4));
+ assertTimeDurationEquals(TimeDuration.ONE_DAY,
TimeDuration.ONE_MINUTE.multiply(60).multiply(24));
+ }
+
+ static void assertTimeDurationEquals(TimeDuration left, TimeDuration right) {
+ assertEquals(0, left.compareTo(right));
+ assertEquals(0, right.compareTo(left));
+ assertEquals(left, right);
+ assertEquals(right, left);
+ }
+
+ static void assertTimeDurationCompareTo(TimeDuration larger, TimeDuration
smaller) {
+ assertTrue(smaller.compareTo(larger) < 0);
+ assertTrue(larger.compareTo(smaller) > 0);
+ assertEquals(smaller, TimeDuration.min(smaller, larger));
+ assertEquals(smaller, TimeDuration.min(larger, smaller));
+ assertEquals(larger, TimeDuration.max(smaller, larger));
+ assertEquals(larger, TimeDuration.max(larger, smaller));
+
+ final TimeDuration diff = larger.add(smaller.negate());
+ assertTrue(diff.isPositive());
+ assertTrue(diff.isNonNegative());
+ assertFalse(diff.isNegative());
+ assertFalse(diff.isNonPositive());
}
private static void assertHigherLower(TimeUnit lower, TimeUnit higher) {