This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch jira3167
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6a4febe4c58d06a789ddf2a37fcf870fd69c8cec
Author: LebronAl <[email protected]>
AuthorDate: Wed May 11 20:23:59 2022 +0800

    finish
---
 confignode/src/assembly/resources/conf/logback.xml |  2 +-
 .../iotdb/consensus/ratis/RatisConsensus.java      | 92 ++++++++++++++++------
 .../commons/client/ClientFactoryProperty.java      |  2 +-
 3 files changed, 70 insertions(+), 26 deletions(-)

diff --git a/confignode/src/assembly/resources/conf/logback.xml 
b/confignode/src/assembly/resources/conf/logback.xml
index e5181c56dd..9c538adb4d 100644
--- a/confignode/src/assembly/resources/conf/logback.xml
+++ b/confignode/src/assembly/resources/conf/logback.xml
@@ -136,5 +136,5 @@
         <appender-ref ref="stdout"/>
     </root>
     <logger level="info" name="org.apache.iotdb.confignode"/>
-    <logger level="warn" name="org.apache.ratis"/>
+    <logger level="info" name="org.apache.ratis"/>
 </configuration>
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java 
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index 8d644bbc81..f714b2bf65 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -58,6 +58,7 @@ import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.apache.ratis.server.DivisionInfo;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.util.NetUtils;
@@ -72,6 +73,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
@@ -103,6 +105,9 @@ class RatisConsensus implements IConsensus {
   private static final int DEFAULT_PRIORITY = 0;
   private static final int LEADER_PRIORITY = 1;
 
+  // TODO make it configurable
+  private static final int DEFAULT_WAIT_LEADER_READY_TIMEOUT = (int) 
TimeUnit.SECONDS.toMillis(20);
+
   /**
    * @param ratisStorageDir different groups of RatisConsensus Peer all share 
ratisStorageDir as
    *     root dir
@@ -149,13 +154,13 @@ class RatisConsensus implements IConsensus {
    */
   @Override
   public ConsensusWriteResponse write(
-      ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
+      ConsensusGroupId consensusGroupId, IConsensusRequest IConsensusRequest) {
 
     // pre-condition: group exists and myself server serves this group
-    RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
+    RaftGroupId raftGroupId = 
Utils.fromConsensusGroupIdToRaftGroupId(consensusGroupId);
     RaftGroup raftGroup = getGroupInfo(raftGroupId);
     if (raftGroup == null || !raftGroup.getPeers().contains(myself)) {
-      return failedWrite(new ConsensusGroupNotExistException(groupId));
+      return failedWrite(new 
ConsensusGroupNotExistException(consensusGroupId));
     }
 
     // serialize request into Message
@@ -163,22 +168,24 @@ class RatisConsensus implements IConsensus {
 
     // 1. first try the local server
     RaftClientRequest clientRequest =
-        buildRawRequest(groupId, message, 
RaftClientRequest.writeRequestType());
+        buildRawRequest(raftGroupId, message, 
RaftClientRequest.writeRequestType());
     RaftClientReply localServerReply;
     RaftPeer suggestedLeader = null;
-    try {
-      localServerReply = server.submitClientRequest(clientRequest);
-      if (localServerReply.isSuccess()) {
-        ResponseMessage responseMessage = (ResponseMessage) 
localServerReply.getMessage();
-        TSStatus writeStatus = (TSStatus) responseMessage.getContentHolder();
-        return 
ConsensusWriteResponse.newBuilder().setStatus(writeStatus).build();
+    if (isLeader(consensusGroupId) && waitUntilLeaderReady(raftGroupId)) {
+      try {
+        localServerReply = server.submitClientRequest(clientRequest);
+        if (localServerReply.isSuccess()) {
+          ResponseMessage responseMessage = (ResponseMessage) 
localServerReply.getMessage();
+          TSStatus writeStatus = (TSStatus) responseMessage.getContentHolder();
+          return 
ConsensusWriteResponse.newBuilder().setStatus(writeStatus).build();
+        }
+        NotLeaderException ex = localServerReply.getNotLeaderException();
+        if (ex != null) { // local server is not leader
+          suggestedLeader = ex.getSuggestedLeader();
+        }
+      } catch (IOException e) {
+        return failedWrite(new RatisRequestFailedException(e));
       }
-      NotLeaderException ex = localServerReply.getNotLeaderException();
-      if (ex != null) { // local server is not leader
-        suggestedLeader = ex.getSuggestedLeader();
-      }
-    } catch (IOException e) {
-      return failedWrite(new RatisRequestFailedException(e));
     }
 
     // 2. try raft client
@@ -209,18 +216,20 @@ class RatisConsensus implements IConsensus {
 
   /** Read directly from LOCAL COPY notice: May read stale data (not 
linearizable) */
   @Override
-  public ConsensusReadResponse read(ConsensusGroupId groupId, 
IConsensusRequest IConsensusRequest) {
-
-    RaftGroup group = 
getGroupInfo(Utils.fromConsensusGroupIdToRaftGroupId(groupId));
+  public ConsensusReadResponse read(
+      ConsensusGroupId consensusGroupId, IConsensusRequest IConsensusRequest) {
+    RaftGroupId groupId = 
Utils.fromConsensusGroupIdToRaftGroupId(consensusGroupId);
+    RaftGroup group = getGroupInfo(groupId);
     if (group == null || !group.getPeers().contains(myself)) {
-      return failedRead(new ConsensusGroupNotExistException(groupId));
+      return failedRead(new ConsensusGroupNotExistException(consensusGroupId));
     }
 
     RaftClientReply reply;
     try {
       RequestMessage message = new RequestMessage(IConsensusRequest);
       RaftClientRequest clientRequest =
-          buildRawRequest(groupId, message, 
RaftClientRequest.staleReadRequestType(-1));
+          buildRawRequest(
+              groupId, message, 
RaftClientRequest.staleReadRequestType(getCommitIndex(groupId)));
       reply = server.submitClientRequest(clientRequest);
       if (!reply.isSuccess()) {
         return failedRead(new 
RatisRequestFailedException(reply.getException()));
@@ -476,6 +485,32 @@ class RatisConsensus implements IConsensus {
     return isLeader;
   }
 
+  private boolean waitUntilLeaderReady(RaftGroupId groupId) {
+    DivisionInfo divisionInfo;
+    try {
+      divisionInfo = server.getDivision(groupId).getInfo();
+    } catch (IOException e) {
+      // if the query fails, simply return not leader
+      logger.info("isLeaderReady checking failed with exception: ", e);
+      return false;
+    }
+    long startTime = System.currentTimeMillis();
+    try {
+      while (divisionInfo.isLeader() && !divisionInfo.isLeaderReady()) {
+        Thread.sleep(100);
+        long consumedTime = System.currentTimeMillis() - startTime;
+        if (consumedTime >= DEFAULT_WAIT_LEADER_READY_TIMEOUT) {
+          logger.warn("{}: leader is still not ready after {}ms", groupId, 
consumedTime);
+          return false;
+        }
+      }
+    } catch (InterruptedException e) {
+      logger.warn("Unexpected interruption", e);
+      return false;
+    }
+    return divisionInfo.isLeader();
+  }
+
   @Override
   public Peer getLeader(ConsensusGroupId groupId) {
     if (isLeader(groupId)) {
@@ -512,12 +547,12 @@ class RatisConsensus implements IConsensus {
   }
 
   private RaftClientRequest buildRawRequest(
-      ConsensusGroupId groupId, Message message, RaftClientRequest.Type type) {
+      RaftGroupId groupId, Message message, RaftClientRequest.Type type) {
     return RaftClientRequest.newBuilder()
         .setServerId(server.getId())
         .setClientId(localFakeId)
         .setCallId(localFakeCallId.incrementAndGet())
-        .setGroupId(Utils.fromConsensusGroupIdToRaftGroupId(groupId))
+        .setGroupId(groupId)
         .setType(type)
         .setMessage(message)
         .build();
@@ -534,11 +569,20 @@ class RatisConsensus implements IConsensus {
         lastSeen.put(raftGroupId, raftGroup);
       }
     } catch (IOException e) {
-      logger.debug("get group failed ", e);
+      logger.debug("get group {} failed ", raftGroupId, e);
     }
     return raftGroup;
   }
 
+  private long getCommitIndex(RaftGroupId raftGroupId) {
+    try {
+      return 
server.getDivision(raftGroupId).getRaftLog().getLastCommittedIndex();
+    } catch (IOException e) {
+      logger.debug("get group {} failed ", raftGroupId, e);
+    }
+    return -1;
+  }
+
   private RaftGroup buildRaftGroup(ConsensusGroupId groupId, List<Peer> peers) 
{
     return RaftGroup.valueOf(
         Utils.fromConsensusGroupIdToRaftGroupId(groupId),
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientFactoryProperty.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientFactoryProperty.java
index f1801628e4..2ff20065e8 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientFactoryProperty.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientFactoryProperty.java
@@ -89,7 +89,7 @@ public class ClientFactoryProperty {
     private DefaultProperty() {}
 
     public static final boolean RPC_THRIFT_COMPRESSED_ENABLED = false;
-    public static final int CONNECTION_TIMEOUT_MS = (int) 
TimeUnit.SECONDS.toMillis(20);;
+    public static final int CONNECTION_TIMEOUT_MS = (int) 
TimeUnit.SECONDS.toMillis(20);
     public static final int SELECTOR_NUM_OF_ASYNC_CLIENT_MANAGER = 1;
   }
 }

Reply via email to