This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new fe1cbbba8 RATIS-1886. AppendLog sleep fixed time cause significant 
drop in write throughput. (#929)
fe1cbbba8 is described below

commit fe1cbbba8893323448544deb9368bd714a210529
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Oct 4 08:32:06 2023 -0700

    RATIS-1886. AppendLog sleep fixed time cause significant drop in write 
throughput. (#929)
---
 .../java/org/apache/ratis/util/TimeDuration.java   | 20 ++++++++-
 .../apache/ratis/grpc/server/GrpcLogAppender.java  | 22 +++++-----
 .../apache/ratis/server/RaftServerConfigKeys.java  |  2 +-
 .../apache/ratis/server/impl/FollowerInfoImpl.java | 47 +++++++++++++++-------
 .../apache/ratis/server/impl/LeaderStateImpl.java  |  2 +-
 .../ratis/server/leader/LogAppenderBase.java       | 16 +++++---
 .../org/apache/ratis/util/TestTimeDuration.java    | 35 ++++++++++++----
 7 files changed, 105 insertions(+), 39 deletions(-)

diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java 
b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
index 10c5c6bd1..4d1fe4ef0 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
@@ -68,6 +68,14 @@ public final class TimeDuration implements 
Comparable<TimeDuration> {
         () -> new IllegalStateException("Failed to compute min(" + left + ", " 
+ right + ")"));
   }
 
+  /** @return the maximum of the given parameters. */
+  public static TimeDuration max(TimeDuration left, TimeDuration right) {
+    Objects.requireNonNull(left, "left == null");
+    Objects.requireNonNull(right, "right == null");
+    return Stream.of(left, right).max(TimeDuration::compareTo).orElseThrow(
+        () -> new IllegalStateException("Failed to compute max(" + left + ", " 
+ right + ")"));
+  }
+
   /** Abbreviations of {@link TimeUnit}. */
   public enum Abbreviation {
     NANOSECONDS("ns", "nanos"),
@@ -304,11 +312,21 @@ public final class TimeDuration implements 
Comparable<TimeDuration> {
     return function.apply(getDuration(), getUnit());
   }
 
-  /** @return Is this {@link TimeDuration} negative? */
+  /** @return Is this {@link TimeDuration} less than zero? */
   public boolean isNegative() {
     return duration < 0;
   }
 
+  /** @return Is this {@link TimeDuration} greater than or equal to zero? */
+  public boolean isNonNegative() {
+    return duration >= 0;
+  }
+
+  /** @return Is this {@link TimeDuration} greater than zero? */
+  public boolean isPositive() {
+    return duration > 0;
+  }
+
   /** @return Is this {@link TimeDuration} less than or equal to zero? */
   public boolean isNonPositive() {
     return duration <= 0;
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 07aecf6fe..422adf499 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -192,7 +192,7 @@ public class GrpcLogAppender extends LogAppenderBase {
       // For normal nodes, new entries should be sent ASAP
       // however for slow followers (especially when the follower is down),
       // keep sending without any wait time only ends up in high CPU load
-      return Math.max(getMinWaitTimeMs(), 0L);
+      return TimeDuration.max(getRemainingWaitTime(), 
TimeDuration.ZERO).toLong(TimeUnit.MILLISECONDS);
     }
     return getHeartbeatWaitTimeMs();
   }
@@ -246,14 +246,16 @@ public class GrpcLogAppender extends LogAppenderBase {
   }
 
   static class StreamObservers {
-    public static final int DEFAULT_WAIT_FOR_READY_MS = 10;
     private final CallStreamObserver<AppendEntriesRequestProto> appendLog;
     private final CallStreamObserver<AppendEntriesRequestProto> heartbeat;
+    private final TimeDuration waitForReady;
     private volatile boolean running = true;
 
-    StreamObservers(GrpcServerProtocolClient client, AppendLogResponseHandler 
handler, boolean separateHeartbeat) {
+    StreamObservers(GrpcServerProtocolClient client, AppendLogResponseHandler 
handler, boolean separateHeartbeat,
+        TimeDuration waitTimeMin) {
       this.appendLog = client.appendEntries(handler, false);
       this.heartbeat = separateHeartbeat? client.appendEntries(handler, true): 
null;
+      this.waitForReady = waitTimeMin.isPositive()? waitTimeMin: 
TimeDuration.ONE_MILLISECOND;
     }
 
     void onNext(AppendEntriesRequestProto proto)
@@ -267,7 +269,7 @@ public class GrpcLogAppender extends LogAppenderBase {
       }
       // stall for stream to be ready.
       while (!stream.isReady() && running) {
-        sleep(DEFAULT_WAIT_FOR_READY_MS, isHeartBeat);
+        sleep(waitForReady, isHeartBeat);
       }
       stream.onNext(proto);
     }
@@ -307,23 +309,23 @@ public class GrpcLogAppender extends LogAppenderBase {
       increaseNextIndex(pending);
       if (appendLogRequestObserver == null) {
         appendLogRequestObserver = new StreamObservers(
-            getClient(), new AppendLogResponseHandler(), useSeparateHBChannel);
+            getClient(), new AppendLogResponseHandler(), useSeparateHBChannel, 
getWaitTimeMin());
       }
     }
 
-    final long waitMs = getMinWaitTimeMs();
-    if (waitMs > 0) {
-      sleep(waitMs, heartbeat);
+    final TimeDuration remaining = getRemainingWaitTime();
+    if (remaining.isPositive()) {
+      sleep(remaining, heartbeat);
     }
     if (isRunning()) {
       sendRequest(request, pending);
     }
   }
 
-  private static void sleep(long waitMs, boolean heartbeat)
+  private static void sleep(TimeDuration waitTime, boolean heartbeat)
       throws InterruptedIOException {
     try {
-      Thread.sleep(waitMs);
+      waitTime.sleep();
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       throw IOUtils.toInterruptedIOException(
diff --git 
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
 
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index cd38e5667..eb367a982 100644
--- 
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ 
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -584,7 +584,7 @@ public interface RaftServerConfigKeys {
       }
 
       String WAIT_TIME_MIN_KEY = PREFIX + ".wait-time.min";
-      TimeDuration WAIT_TIME_MIN_DEFAULT = TimeDuration.valueOf(10, 
TimeUnit.MILLISECONDS);
+      TimeDuration WAIT_TIME_MIN_DEFAULT = TimeDuration.ONE_MILLISECOND;
       static TimeDuration waitTimeMin(RaftProperties properties) {
         return 
getTimeDuration(properties.getTimeDuration(WAIT_TIME_MIN_DEFAULT.getUnit()),
             WAIT_TIME_MIN_KEY, WAIT_TIME_MIN_DEFAULT, getDefaultLog());
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
index 91ab90a20..9394398ac 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
@@ -26,13 +26,10 @@ import org.apache.ratis.server.raftlog.RaftLogIndex;
 import org.apache.ratis.util.Timestamp;
 
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Consumer;
 import java.util.function.Function;
 
 class FollowerInfoImpl implements FollowerInfo {
   private final String name;
-  private final Consumer<Object> infoIndexChange;
-  private final Consumer<Object> debugIndexChange;
 
   private final AtomicReference<RaftPeer> peer;
   private final Function<RaftPeerId, RaftPeer> getPeer;
@@ -50,8 +47,6 @@ class FollowerInfoImpl implements FollowerInfo {
   FollowerInfoImpl(RaftGroupMemberId id, RaftPeer peer, Function<RaftPeerId, 
RaftPeer> getPeer,
       Timestamp lastRpcTime, long nextIndex, boolean caughtUp) {
     this.name = id + "->" + peer.getId();
-    this.infoIndexChange = s -> LOG.info("{}: {}", name, s);
-    this.debugIndexChange = s -> LOG.debug("{}: {}", name, s);
 
     this.peer = new AtomicReference<>(peer);
     this.getPeer = getPeer;
@@ -63,6 +58,30 @@ class FollowerInfoImpl implements FollowerInfo {
     this.caughtUp = caughtUp;
   }
 
+  private void info(Object message) {
+    if (LOG.isInfoEnabled()) {
+      LOG.info("{}: {}", name, message);
+    }
+  }
+
+  private void info(String prefix, Object message) {
+    if (LOG.isInfoEnabled()) {
+      LOG.info("{}: {} {}", name, prefix, message);
+    }
+  }
+
+  private void debug(Object message) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("{}: {}", name, message);
+    }
+  }
+
+  private void debug(String prefix, Object message) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("{}: {} {}", name, prefix, message);
+    }
+  }
+
   @Override
   public long getMatchIndex() {
     return matchIndex.get();
@@ -70,7 +89,7 @@ class FollowerInfoImpl implements FollowerInfo {
 
   @Override
   public boolean updateMatchIndex(long newMatchIndex) {
-    return matchIndex.updateToMax(newMatchIndex, debugIndexChange);
+    return matchIndex.updateToMax(newMatchIndex, this::debug);
   }
 
   @Override
@@ -80,7 +99,7 @@ class FollowerInfoImpl implements FollowerInfo {
 
   @Override
   public boolean updateCommitIndex(long newCommitIndex) {
-    return commitIndex.updateToMax(newCommitIndex, debugIndexChange);
+    return commitIndex.updateToMax(newCommitIndex, this::debug);
   }
 
   @Override
@@ -95,32 +114,32 @@ class FollowerInfoImpl implements FollowerInfo {
 
   @Override
   public void increaseNextIndex(long newNextIndex) {
-    nextIndex.updateIncreasingly(newNextIndex, debugIndexChange);
+    nextIndex.updateIncreasingly(newNextIndex, this::debug);
   }
 
   @Override
   public void decreaseNextIndex(long newNextIndex) {
     nextIndex.updateUnconditionally(old -> old <= 0L? old: Math.min(old - 1, 
newNextIndex),
-        message -> infoIndexChange.accept("decreaseNextIndex " + message));
+        message -> info("decreaseNextIndex", message));
   }
 
   @Override
   public void setNextIndex(long newNextIndex) {
     nextIndex.updateUnconditionally(old -> newNextIndex >= 0 ? newNextIndex : 
old,
-        message -> infoIndexChange.accept("setNextIndex " + message));
+        message -> info("setNextIndex", message));
   }
 
   @Override
   public void updateNextIndex(long newNextIndex) {
     nextIndex.updateToMax(newNextIndex,
-        message -> infoIndexChange.accept("decreaseNextIndex " + message));
+        message -> debug("updateNextIndex", message));
   }
 
   @Override
   public void setSnapshotIndex(long newSnapshotIndex) {
-    snapshotIndex.setUnconditionally(newSnapshotIndex, infoIndexChange);
-    matchIndex.setUnconditionally(newSnapshotIndex, infoIndexChange);
-    nextIndex.setUnconditionally(newSnapshotIndex + 1, infoIndexChange);
+    snapshotIndex.setUnconditionally(newSnapshotIndex, this::info);
+    matchIndex.setUnconditionally(newSnapshotIndex, this::info);
+    nextIndex.setUnconditionally(newSnapshotIndex + 1, this::info);
   }
 
   @Override
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 4ebfc3d56..44468fb53 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -1049,7 +1049,7 @@ class LeaderStateImpl implements LeaderState {
    */
 
   public boolean checkLeadership() {
-    if (!server.getInfo().isLeader()) {
+    if (!server.getRole().getLeaderState().filter(leader -> leader == 
this).isPresent()) {
       return false;
     }
     // The initial value of lastRpcResponseTime in FollowerInfo is set by
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
index 1c0f61836..d57c46a34 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
@@ -33,6 +33,7 @@ import org.apache.ratis.util.DataQueue;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.SizeInBytes;
+import org.apache.ratis.util.TimeDuration;
 
 import java.util.Collections;
 import java.util.List;
@@ -56,7 +57,7 @@ public abstract class LogAppenderBase implements LogAppender {
   private final AwaitForSignal eventAwaitForSignal;
 
   private final AtomicBoolean heartbeatTrigger = new AtomicBoolean();
-  private final long waitTimeMinMs;
+  private final TimeDuration waitTimeMin;
 
   protected LogAppenderBase(RaftServer.Division server, LeaderState 
leaderState, FollowerInfo f) {
     this.follower = f;
@@ -73,7 +74,7 @@ public abstract class LogAppenderBase implements LogAppender {
     this.daemon = new LogAppenderDaemon(this);
     this.eventAwaitForSignal = new AwaitForSignal(name);
 
-    this.waitTimeMinMs = 
RaftServerConfigKeys.Log.Appender.waitTimeMin(properties).toLong(TimeUnit.MILLISECONDS);
+    this.waitTimeMin = 
RaftServerConfigKeys.Log.Appender.waitTimeMin(properties);
   }
 
   @Override
@@ -136,8 +137,12 @@ public abstract class LogAppenderBase implements 
LogAppender {
     getLeaderState().restart(this);
   }
 
-  public long getMinWaitTimeMs() {
-    return waitTimeMinMs - getFollower().getLastRpcSendTime().elapsedTimeMs();
+  protected TimeDuration getWaitTimeMin() {
+    return waitTimeMin;
+  }
+
+  protected TimeDuration getRemainingWaitTime() {
+    return 
waitTimeMin.add(getFollower().getLastRpcSendTime().elapsedTime().negate());
   }
 
   @Override
@@ -203,7 +208,8 @@ public abstract class LogAppenderBase implements 
LogAppender {
     }
 
     final List<LogEntryProto> protos = 
buffer.pollList(getHeartbeatWaitTimeMs(), EntryWithData::getEntry,
-        (entry, time, exception) -> LOG.warn("Failed to get {} in {}: {}", 
entry, time, exception));
+        (entry, time, exception) -> LOG.warn("Failed to get " + entry
+            + " in " + time.toString(TimeUnit.MILLISECONDS, 3), exception));
     buffer.clear();
     assertProtos(protos, followerNext, previous, snapshotIndex);
     return leaderState.newAppendEntriesRequestProto(follower, protos, 
previous, callId);
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/util/TestTimeDuration.java 
b/ratis-test/src/test/java/org/apache/ratis/util/TestTimeDuration.java
index 2301a1073..a2c180d92 100644
--- a/ratis-test/src/test/java/org/apache/ratis/util/TestTimeDuration.java
+++ b/ratis-test/src/test/java/org/apache/ratis/util/TestTimeDuration.java
@@ -27,9 +27,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static org.apache.ratis.util.TimeDuration.Abbreviation;
-import static org.apache.ratis.util.TimeDuration.LOG;
 import static org.apache.ratis.util.TimeDuration.parse;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
@@ -249,14 +249,35 @@ public class TestTimeDuration {
 
   @Test(timeout = 1000)
   public void testCompareTo() {
-    assertTrue(TimeDuration.ONE_SECOND.compareTo(TimeDuration.ONE_MINUTE) < 0);
-    assertTrue(TimeDuration.ONE_MINUTE.compareTo(TimeDuration.ONE_SECOND) > 0);
+    assertTimeDurationCompareTo(TimeDuration.ONE_MINUTE, 
TimeDuration.ONE_SECOND);
 
-    assertTrue(TimeDuration.valueOf(15, 
TimeUnit.SECONDS).compareTo(TimeDuration.ONE_MINUTE) < 0);
-    assertTrue(TimeDuration.ONE_MINUTE.compareTo(TimeDuration.valueOf(15, 
TimeUnit.SECONDS)) > 0);
+    final TimeDuration fifteenSecond = TimeDuration.valueOf(15, 
TimeUnit.SECONDS);
+    assertTimeDurationCompareTo(TimeDuration.ONE_DAY, fifteenSecond);
 
-    assertEquals(0, TimeDuration.valueOf(60, 
TimeUnit.SECONDS).compareTo(TimeDuration.ONE_MINUTE));
-    assertEquals(0, TimeDuration.ONE_MINUTE.compareTo(TimeDuration.valueOf(60, 
TimeUnit.SECONDS)));
+    assertTimeDurationEquals(TimeDuration.ONE_MINUTE, 
fifteenSecond.multiply(4));
+    assertTimeDurationEquals(TimeDuration.ONE_DAY, 
TimeDuration.ONE_MINUTE.multiply(60).multiply(24));
+  }
+
+  static void assertTimeDurationEquals(TimeDuration left, TimeDuration right) {
+    assertEquals(0, left.compareTo(right));
+    assertEquals(0, right.compareTo(left));
+    assertEquals(left, right);
+    assertEquals(right, left);
+  }
+
+  static void assertTimeDurationCompareTo(TimeDuration larger, TimeDuration 
smaller) {
+    assertTrue(smaller.compareTo(larger) < 0);
+    assertTrue(larger.compareTo(smaller) > 0);
+    assertEquals(smaller, TimeDuration.min(smaller, larger));
+    assertEquals(smaller, TimeDuration.min(larger, smaller));
+    assertEquals(larger, TimeDuration.max(smaller, larger));
+    assertEquals(larger, TimeDuration.max(larger, smaller));
+
+    final TimeDuration diff = larger.add(smaller.negate());
+    assertTrue(diff.isPositive());
+    assertTrue(diff.isNonNegative());
+    assertFalse(diff.isNegative());
+    assertFalse(diff.isNonPositive());
   }
 
   private static void assertHigherLower(TimeUnit lower, TimeUnit higher) {

Reply via email to