This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 9230867d1 RATIS-1706. Move heartbeat listeners to LeaderState (#743)
9230867d1 is described below

commit 9230867d19953583b817c7b56ccd4a933fb3835a
Author: William Song <[email protected]>
AuthorDate: Tue Sep 20 17:53:06 2022 +0800

    RATIS-1706. Move heartbeat listeners to LeaderState (#743)
---
 .../apache/ratis/server/RaftServerConfigKeys.java  |   2 +-
 .../apache/ratis/server/impl/LeaderStateImpl.java  |  11 +-
 ...{ReadRequests.java => ReadIndexHeartbeats.java} |  80 ++----------
 .../org/apache/ratis/server/impl/ReadRequests.java | 135 ---------------------
 .../org/apache/ratis/ReadOnlyRequestTests.java     |  31 ++---
 5 files changed, 27 insertions(+), 232 deletions(-)

diff --git 
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
 
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 5ae1020c5..47e50a048 100644
--- 
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ 
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -179,7 +179,7 @@ public interface RaftServerConfigKeys {
       LINEARIZABLE
     }
 
-    String OPTION_KEY = ".option";
+    String OPTION_KEY = PREFIX + ".option";
     Option OPTION_DEFAULT = Option.DEFAULT;
     static Option option(RaftProperties properties) {
       Option option =  get(properties::getEnum, OPTION_KEY, OPTION_DEFAULT, 
getDefaultLog());
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index d51a3bcfc..74649c566 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -40,7 +40,7 @@ import 
org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.apache.ratis.protocol.exceptions.NotReplicatedException;
 import org.apache.ratis.protocol.exceptions.ReconfigurationTimeoutException;
 import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.ReadRequests.AppendEntriesListener;
+import org.apache.ratis.server.impl.ReadIndexHeartbeats.AppendEntriesListener;
 import org.apache.ratis.server.leader.FollowerInfo;
 import org.apache.ratis.server.leader.LeaderState;
 import org.apache.ratis.server.leader.LogAppender;
@@ -260,6 +260,8 @@ class LeaderStateImpl implements LeaderState {
   private final long followerMaxGapThreshold;
   private final PendingStepDown pendingStepDown;
 
+  private final ReadIndexHeartbeats readIndexHeartbeats;
+
   LeaderStateImpl(RaftServerImpl server) {
     this.name = server.getMemberId() + "-" + 
JavaUtils.getClassSimpleName(getClass());
     this.server = server;
@@ -279,6 +281,7 @@ class LeaderStateImpl implements LeaderState {
     this.watchRequests = new WatchRequests(server.getMemberId(), properties);
     this.messageStreamRequests = new 
MessageStreamRequests(server.getMemberId());
     this.pendingStepDown = new PendingStepDown(this);
+    this.readIndexHeartbeats = new ReadIndexHeartbeats();
     long maxPendingRequests = 
RaftServerConfigKeys.Write.elementLimit(properties);
     double followerGapRatioMax = 
RaftServerConfigKeys.Write.followerGapRatioMax(properties);
 
@@ -337,6 +340,8 @@ class LeaderStateImpl implements LeaderState {
       LOG.warn("{}: Caught exception in sendNotLeaderResponses", this, e);
     }
     messageStreamRequests.clear();
+    // TODO client should retry on NotLeaderException
+    readIndexHeartbeats.failListeners(nle);
     server.getServerRpc().notifyNotLeader(server.getMemberId().getGroupId());
     logAppenderMetrics.unregister();
     raftServerMetrics.unregister();
@@ -1087,7 +1092,7 @@ class LeaderStateImpl implements LeaderState {
 
     final MemoizedSupplier<AppendEntriesListener> supplier = 
MemoizedSupplier.valueOf(
         () -> new AppendEntriesListener(readIndex));
-    final AppendEntriesListener listener = 
server.getReadRequests().addAppendEntriesListener(
+    final AppendEntriesListener listener = 
readIndexHeartbeats.addAppendEntriesListener(
         readIndex, key -> supplier.get());
 
     // the readIndex is already acknowledged before
@@ -1110,7 +1115,7 @@ class LeaderStateImpl implements LeaderState {
 
   @Override
   public void onAppendEntriesReply(LogAppender appender, 
RaftProtos.AppendEntriesReplyProto reply) {
-    server.getReadRequests().onAppendEntriesReply(appender, reply, 
this::hasMajority);
+    readIndexHeartbeats.onAppendEntriesReply(appender, reply, 
this::hasMajority);
   }
 
   void replyPendingRequest(long logIndex, RaftClientReply reply) {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexHeartbeats.java
similarity index 67%
copy from 
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java
copy to 
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexHeartbeats.java
index bbd2a92ba..40c559b2f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexHeartbeats.java
@@ -17,37 +17,25 @@
  */
 package org.apache.ratis.server.impl;
 
-import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
 import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.protocol.exceptions.ReadException;
-import org.apache.ratis.protocol.exceptions.TimeoutIOException;
-import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.leader.LogAppender;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.server.raftlog.RaftLogIndex;
-import org.apache.ratis.statemachine.StateMachine;
-import org.apache.ratis.util.MemoizedSupplier;
-import org.apache.ratis.util.TimeDuration;
-import org.apache.ratis.util.TimeoutExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Iterator;
 import java.util.Map;
 import java.util.NavigableMap;
-import java.util.Optional;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Predicate;
 
-/** For supporting linearizable read. */
-class ReadRequests {
-  private static final Logger LOG = 
LoggerFactory.getLogger(ReadRequests.class);
+class ReadIndexHeartbeats {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ReadIndexHeartbeats.class);
 
   /** The acknowledgement from a {@link LogAppender} of a heartbeat for a 
particular call id. */
   static class HeartbeatAck {
@@ -151,66 +139,23 @@ class ReadRequests {
         }
 
         if (listener.receive(appender, reply, hasMajority)) {
-          ackedCommitIndex.updateToMax(listener.commitIndex, s -> 
LOG.debug("{}: {}", ReadRequests.this, s));
+          ackedCommitIndex.updateToMax(listener.commitIndex, s -> 
LOG.debug("{}: {}", this, s));
           iterator.remove();
         }
       }
     }
-  }
-
-  static class ReadIndexQueue {
-    private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
-    private final NavigableMap<Long, CompletableFuture<Long>> sorted = new 
ConcurrentSkipListMap<>();
-    private final TimeDuration readTimeout;
-
-    ReadIndexQueue(TimeDuration readTimeout) {
-      this.readTimeout = readTimeout;
-    }
-
-    CompletableFuture<Long> add(long readIndex) {
-      final MemoizedSupplier<CompletableFuture<Long>> supplier = 
MemoizedSupplier.valueOf(CompletableFuture::new);
-      final CompletableFuture<Long> f = sorted.computeIfAbsent(readIndex, i -> 
supplier.get());
-
-      if (supplier.isInitialized()) {
-        scheduler.onTimeout(readTimeout, () -> handleTimeout(readIndex),
-            LOG, () -> "Failed to handle read timeout for index " + readIndex);
-      }
-      return f;
-    }
-
-    private void handleTimeout(long readIndex) {
-      Optional.ofNullable(sorted.remove(readIndex)).ifPresent(consumer -> {
-        consumer.completeExceptionally(
-          new ReadException(new TimeoutIOException("Read timeout for index " + 
readIndex)));
-      });
-    }
 
-    void complete(Long appliedIndex) {
-      for(;;) {
-        if (sorted.isEmpty()) {
-          return;
-        }
-        final Long first = sorted.firstKey();
-        if (first == null || first > appliedIndex) {
-          return;
-        }
-        Optional.ofNullable(sorted.remove(first)).ifPresent(f -> 
f.complete(appliedIndex));
-      }
+    synchronized void failAll(Exception e) {
+      sorted.forEach((index, listener) -> 
listener.getFuture().completeExceptionally(e));
+      sorted.clear();
     }
   }
 
   private final AppendEntriesListeners appendEntriesListeners = new 
AppendEntriesListeners();
   private final RaftLogIndex ackedCommitIndex = new 
RaftLogIndex("ackedCommitIndex", RaftLog.INVALID_LOG_INDEX);
-  private final ReadIndexQueue readIndexQueue;
-  private final StateMachine stateMachine;
-
-  ReadRequests(RaftProperties properties, StateMachine stateMachine) {
-    this.readIndexQueue = new 
ReadIndexQueue(RaftServerConfigKeys.Read.timeout(properties));
-    this.stateMachine = stateMachine;
-  }
 
   AppendEntriesListener addAppendEntriesListener(long commitIndex,
-                                                 Function<Long, 
AppendEntriesListener> constructor) {
+                                                              Function<Long, 
AppendEntriesListener> constructor) {
     if (commitIndex <= ackedCommitIndex.get()) {
       return null;
     }
@@ -222,14 +167,7 @@ class ReadRequests {
     appendEntriesListeners.onAppendEntriesReply(appender, reply, hasMajority);
   }
 
-  Consumer<Long> getAppliedIndexConsumer() {
-    return readIndexQueue::complete;
-  }
-
-  CompletableFuture<Long> waitToAdvance(long readIndex) {
-    if (stateMachine.getLastAppliedTermIndex().getIndex() >= readIndex) {
-      return CompletableFuture.completedFuture(readIndex);
-    }
-    return readIndexQueue.add(readIndex);
+  void failListeners(Exception e) {
+    appendEntriesListeners.failAll(e);
   }
 }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java
index bbd2a92ba..aadc45c12 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java
@@ -18,14 +18,9 @@
 package org.apache.ratis.server.impl;
 
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
-import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.exceptions.ReadException;
 import org.apache.ratis.protocol.exceptions.TimeoutIOException;
 import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.leader.LogAppender;
-import org.apache.ratis.server.raftlog.RaftLog;
-import org.apache.ratis.server.raftlog.RaftLogIndex;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.util.MemoizedSupplier;
 import org.apache.ratis.util.TimeDuration;
@@ -33,131 +28,16 @@ import org.apache.ratis.util.TimeoutExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Iterator;
-import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Optional;
-import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.function.Predicate;
 
 /** For supporting linearizable read. */
 class ReadRequests {
   private static final Logger LOG = 
LoggerFactory.getLogger(ReadRequests.class);
 
-  /** The acknowledgement from a {@link LogAppender} of a heartbeat for a 
particular call id. */
-  static class HeartbeatAck {
-    private final LogAppender appender;
-    private final long minCallId;
-    private volatile boolean acknowledged = false;
-
-    HeartbeatAck(LogAppender appender) {
-      this.appender = appender;
-      this.minCallId = appender.getCallId();
-    }
-
-    /** Is the heartbeat (for a particular call id) acknowledged? */
-    boolean isAcknowledged() {
-      return acknowledged;
-    }
-
-    /**
-     * @return true if the acknowledged state is changed from false to true;
-     *         otherwise, the acknowledged state remains unchanged, return 
false.
-     */
-    boolean receive(AppendEntriesReplyProto reply) {
-      if (acknowledged) {
-        return false;
-      }
-      synchronized (this) {
-        if (!acknowledged && isValid(reply)) {
-          acknowledged = true;
-          return true;
-        }
-        return false;
-      }
-    }
-
-    private boolean isValid(AppendEntriesReplyProto reply) {
-      if (reply == null || !reply.getServerReply().getSuccess()) {
-        return false;
-      }
-      // valid only if the reply has a later call id than the min.
-      return 
appender.getCallIdComparator().compare(reply.getServerReply().getCallId(), 
minCallId) >= 0;
-    }
-  }
-
-  static class AppendEntriesListener {
-    private final long commitIndex;
-    private final CompletableFuture<Long> future = new CompletableFuture<>();
-    private final ConcurrentHashMap<RaftPeerId, HeartbeatAck> replies = new 
ConcurrentHashMap<>();
-
-    AppendEntriesListener(long commitIndex) {
-      this.commitIndex = commitIndex;
-    }
-
-    CompletableFuture<Long> getFuture() {
-      return future;
-    }
-
-    boolean receive(LogAppender logAppender, AppendEntriesReplyProto proto,
-                    Predicate<Predicate<RaftPeerId>> hasMajority) {
-      if (isCompletedNormally()) {
-        return true;
-      }
-
-      final HeartbeatAck reply = replies.computeIfAbsent(
-          logAppender.getFollowerId(), key -> new HeartbeatAck(logAppender));
-      if (reply.receive(proto)) {
-        if (hasMajority.test(id -> replies.get(id).isAcknowledged())) {
-          future.complete(commitIndex);
-          return true;
-        }
-      }
-
-      return isCompletedNormally();
-    }
-
-    boolean isCompletedNormally() {
-      return future.isDone() && !future.isCancelled() && 
!future.isCompletedExceptionally();
-    }
-  }
-
-  class AppendEntriesListeners {
-    private final NavigableMap<Long, AppendEntriesListener> sorted = new 
TreeMap<>();
-
-    synchronized AppendEntriesListener add(long commitIndex, Function<Long, 
AppendEntriesListener> constructor) {
-      return sorted.computeIfAbsent(commitIndex, constructor);
-    }
-
-    synchronized void onAppendEntriesReply(LogAppender appender, 
AppendEntriesReplyProto reply,
-                                           Predicate<Predicate<RaftPeerId>> 
hasMajority) {
-      final long callId = reply.getServerReply().getCallId();
-
-      Iterator<Map.Entry<Long, AppendEntriesListener>> iterator = 
sorted.entrySet().iterator();
-      while (iterator.hasNext()) {
-        Map.Entry<Long, AppendEntriesListener> entry = iterator.next();
-        if (entry.getKey() > callId) {
-          return;
-        }
-
-        final AppendEntriesListener listener = entry.getValue();
-        if (listener == null) {
-          continue;
-        }
-
-        if (listener.receive(appender, reply, hasMajority)) {
-          ackedCommitIndex.updateToMax(listener.commitIndex, s -> 
LOG.debug("{}: {}", ReadRequests.this, s));
-          iterator.remove();
-        }
-      }
-    }
-  }
-
   static class ReadIndexQueue {
     private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
     private final NavigableMap<Long, CompletableFuture<Long>> sorted = new 
ConcurrentSkipListMap<>();
@@ -199,8 +79,6 @@ class ReadRequests {
     }
   }
 
-  private final AppendEntriesListeners appendEntriesListeners = new 
AppendEntriesListeners();
-  private final RaftLogIndex ackedCommitIndex = new 
RaftLogIndex("ackedCommitIndex", RaftLog.INVALID_LOG_INDEX);
   private final ReadIndexQueue readIndexQueue;
   private final StateMachine stateMachine;
 
@@ -209,19 +87,6 @@ class ReadRequests {
     this.stateMachine = stateMachine;
   }
 
-  AppendEntriesListener addAppendEntriesListener(long commitIndex,
-                                                 Function<Long, 
AppendEntriesListener> constructor) {
-    if (commitIndex <= ackedCommitIndex.get()) {
-      return null;
-    }
-    return appendEntriesListeners.add(commitIndex, constructor);
-  }
-
-  void onAppendEntriesReply(LogAppender appender, AppendEntriesReplyProto 
reply,
-                            Predicate<Predicate<RaftPeerId>> hasMajority) {
-    appendEntriesListeners.onAppendEntriesReply(appender, reply, hasMajority);
-  }
-
   Consumer<Long> getAppliedIndexConsumer() {
     return readIndexQueue::complete;
   }
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java 
b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
index f611ac4d1..61d43d85b 100644
--- a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
@@ -124,32 +124,18 @@ public abstract class ReadOnlyRequestTests<CLUSTER 
extends MiniRaftCluster>
 
         RaftClientReply reply = client.io().send(incrementMessage);
         Assert.assertTrue(reply.isSuccess());
-        Semaphore canRead = new Semaphore(0);
 
-        // this future will complete after 500 ms
-        Thread thread = new Thread(() -> {
-          try {
-            RaftClientReply staleValueBefore = client.io()
+        CompletableFuture<RaftClientReply> result = 
client.async().send(waitAndIncrementMessage);
+        Thread.sleep(100);
+
+        RaftClientReply staleValueBefore = client.io()
                 .sendStaleRead(queryMessage, 0, leaderId);
 
-            Assert.assertEquals(retrieve(staleValueBefore), 1);
-
-            canRead.acquire();
-            // we still have to sleep for a while to guarantee that the async 
write arrives at RaftServer
-            Thread.sleep(100);
-            // send a linearizable read request
-            // linearizable read will wait the statemachine to advance
-            RaftClientReply linearizableReadValue = client.io()
-                .sendReadOnly(queryMessage);
-            Assert.assertEquals(retrieve(linearizableReadValue), 2);
-          }
-          catch (Exception ignored) {}
-        });
+        Assert.assertEquals(retrieve(staleValueBefore), 1);
 
-        thread.start();
-        CompletableFuture<RaftClientReply> result = 
client.async().send(waitAndIncrementMessage);
-        canRead.release();
-        thread.join();
+        RaftClientReply linearizableReadValue = client.io()
+            .sendReadOnly(queryMessage);
+        Assert.assertEquals(retrieve(linearizableReadValue), 2);
 
       }
     } finally {
@@ -231,6 +217,7 @@ public abstract class ReadOnlyRequestTests<CLUSTER extends 
MiniRaftCluster>
 
         leaderClient.io().send(incrementMessage);
         leaderClient.async().send(waitAndIncrementMessage);
+        Thread.sleep(100);
 
         RaftClientReply clientReply = 
followerClient1.io().sendReadOnly(queryMessage, followers.get(0).getId());
         Assert.assertEquals(2, retrieve(clientReply));

Reply via email to