This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/expr by this push:
     new a4a21bb  add raft group id in return message
a4a21bb is described below

commit a4a21bb497c8ec63cd5bf16fbabc3e81c90af9c5
Author: jt <[email protected]>
AuthorDate: Fri Dec 24 14:34:11 2021 +0800

    add raft group id in return message
---
 .../cluster/server/member/DataGroupMember.java     |  3 ++-
 .../iotdb/cluster/server/member/RaftMember.java    | 29 +++++++++++---------
 .../apache/iotdb/session/SessionConnection.java    | 31 +++++++++++++++++-----
 3 files changed, 42 insertions(+), 21 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 8ef416b..a6b585d 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -782,7 +782,8 @@ public class DataGroupMember extends RaftMember implements 
DataGroupMemberMBean
   private TSStatus executeNonQueryPlanWithKnownLeader(PhysicalPlan plan) {
     if (character == NodeCharacter.LEADER) {
       if (plan.getTargetedTerm() > 0 && plan.getTargetedTerm() != term.get()) {
-        return 
StatusUtils.getStatus(TSStatusCode.LEADER_CHANGED).setMessage(term.get() + "");
+        return StatusUtils.getStatus(TSStatusCode.LEADER_CHANGED)
+            .setMessage(getRaftGroupFullId() + "-" + term.get());
       }
 
       long startTime = 
Statistic.DATA_GROUP_MEMBER_LOCAL_EXECUTION.getOperationStartTime();
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 287330c..f961cd8 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -1197,8 +1197,8 @@ public abstract class RaftMember implements 
RaftMemberMBean {
           
Statistic.LOG_DISPATCHER_FROM_CREATE_TO_OK.calOperationCostTimeFromStart(
               log.getCreateTime());
           
Statistic.LOG_DISPATCHER_TOTAL.calOperationCostTimeFromStart(totalStartTime);
-          return StatusUtils.getStatus(TSStatusCode.WEAKLY_ACCEPTED)
-              .setMessage(log.getCurrLogIndex() + "-" + log.getCurrLogTerm());
+          return includeLogNumbersInStatus(
+              StatusUtils.getStatus(TSStatusCode.WEAKLY_ACCEPTED), log);
         case OK:
           logger.debug(MSG_LOG_IS_ACCEPTED, name, log);
           startTime = 
Timer.Statistic.RAFT_SENDER_COMMIT_LOG.getOperationStartTime();
@@ -1207,9 +1207,7 @@ public abstract class RaftMember implements 
RaftMemberMBean {
           
Statistic.LOG_DISPATCHER_FROM_CREATE_TO_OK.calOperationCostTimeFromStart(
               log.getCreateTime());
           
Statistic.LOG_DISPATCHER_TOTAL.calOperationCostTimeFromStart(totalStartTime);
-          return StatusUtils.OK
-              .deepCopy()
-              .setMessage(log.getCurrLogIndex() + "-" + log.getCurrLogTerm());
+          return includeLogNumbersInStatus(StatusUtils.OK.deepCopy(), log);
         case TIME_OUT:
           logger.debug("{}: log {} timed out...", name, log);
           break;
@@ -1224,6 +1222,11 @@ public abstract class RaftMember implements 
RaftMemberMBean {
     return StatusUtils.TIME_OUT;
   }
 
+  private TSStatus includeLogNumbersInStatus(TSStatus status, Log log) {
+    return status.setMessage(
+        getRaftGroupFullId() + "-" + log.getCurrLogIndex() + "-" + 
log.getCurrLogTerm());
+  }
+
   public SendLogRequest buildSendLogRequest(Log log) {
     VotingLog votingLog = buildVotingLog(log);
     AtomicBoolean leaderShipStale = new AtomicBoolean(false);
@@ -1898,9 +1901,7 @@ public abstract class RaftMember implements 
RaftMemberMBean {
       logger.debug(MSG_LOG_IS_ACCEPTED, name, log);
       commitLog(log.getLog());
       
Timer.Statistic.RAFT_SENDER_COMMIT_LOG.calOperationCostTimeFromStart(startTime);
-      return StatusUtils.OK
-          .deepCopy()
-          .setMessage(log.getLog().getCurrLogIndex() + "-" + 
log.getLog().getCurrLogTerm());
+      return includeLogNumbersInStatus(StatusUtils.OK.deepCopy(), 
log.getLog());
     }
 
     int retryTime = 0;
@@ -1920,17 +1921,15 @@ public abstract class RaftMember implements 
RaftMemberMBean {
           
Statistic.LOG_DISPATCHER_FROM_CREATE_TO_OK.calOperationCostTimeFromStart(
               log.getLog().getCreateTime());
           
Statistic.LOG_DISPATCHER_TOTAL.calOperationCostTimeFromStart(totalStartTime);
-          return StatusUtils.getStatus(TSStatusCode.WEAKLY_ACCEPTED)
-              .setMessage(log.getLog().getCurrLogIndex() + "-" + 
log.getLog().getCurrLogTerm());
+          return includeLogNumbersInStatus(
+              StatusUtils.getStatus(TSStatusCode.WEAKLY_ACCEPTED), 
log.getLog());
         case OK:
           startTime = 
Timer.Statistic.RAFT_SENDER_COMMIT_LOG.getOperationStartTime();
           logger.debug(MSG_LOG_IS_ACCEPTED, name, log);
           commitLog(log.getLog());
           
Timer.Statistic.RAFT_SENDER_COMMIT_LOG.calOperationCostTimeFromStart(startTime);
           
Statistic.LOG_DISPATCHER_TOTAL.calOperationCostTimeFromStart(totalStartTime);
-          return StatusUtils.OK
-              .deepCopy()
-              .setMessage(log.getLog().getCurrLogIndex() + "-" + 
log.getLog().getCurrLogTerm());
+          return includeLogNumbersInStatus(StatusUtils.OK.deepCopy(), 
log.getLog());
         case TIME_OUT:
           logger.debug("{}: log {} timed out, retrying...", name, log);
           try {
@@ -2282,4 +2281,8 @@ public abstract class RaftMember implements 
RaftMemberMBean {
   public String getLastCatchUpResponseTimeAsString() {
     return lastCatchUpResponseTime.toString();
   }
+
+  public String getRaftGroupFullId() {
+    return (getHeader() != null ? getHeader().node.nodeIdentifier : 0) + "#" + 
getRaftGroupId();
+  }
 }
diff --git 
a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java 
b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index ae468a7..d82f78b 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -66,7 +66,9 @@ import org.slf4j.LoggerFactory;
 
 import java.time.ZoneId;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 
 public class SessionConnection {
@@ -84,7 +86,7 @@ public class SessionConnection {
   private List<EndPoint> endPointList = new ArrayList<>();
   private boolean enableRedirect = false;
 
-  private List<UnconfirmedRequest> unconfirmedRequests = new ArrayList<>();
+  private Map<String, List<UnconfirmedRequest>> groupUnconfirmedRequests = new 
HashMap<>();
   private long latestTerm;
 
   // TestOnly
@@ -984,8 +986,11 @@ public class SessionConnection {
     }
     try {
       String[] split = message.split("-");
-      long index = Long.parseLong(split[0]);
-      long term = Long.parseLong(split[1]);
+      String raftGroupId = split[0];
+      long index = Long.parseLong(split[1]);
+      long term = Long.parseLong(split[2]);
+      List<UnconfirmedRequest> unconfirmedRequests =
+          groupUnconfirmedRequests.computeIfAbsent(raftGroupId, r -> new 
ArrayList<>());
       if (!unconfirmedRequests.isEmpty() && unconfirmedRequests.get(0).term == 
term) {
         unconfirmedRequests.removeIf(r -> r.index < index);
       }
@@ -996,13 +1001,16 @@ public class SessionConnection {
   }
 
   private void onWeaklyAccepted(String message, Object request) {
-    if (message == null) {
+    if (message == null || message.isEmpty()) {
       return;
     }
     try {
       String[] split = message.split("-");
-      long index = Long.parseLong(split[0]);
-      long term = Long.parseLong(split[1]);
+      String raftGroupId = split[0];
+      long index = Long.parseLong(split[1]);
+      long term = Long.parseLong(split[2]);
+      List<UnconfirmedRequest> unconfirmedRequests =
+          groupUnconfirmedRequests.computeIfAbsent(raftGroupId, r -> new 
ArrayList<>());
       UnconfirmedRequest unconfirmedRequest = new UnconfirmedRequest();
       unconfirmedRequest.index = index;
       unconfirmedRequest.term = term;
@@ -1019,7 +1027,11 @@ public class SessionConnection {
       return;
     }
     try {
-      latestTerm = Long.parseLong(message);
+      String[] split = message.split("-");
+      String raftGroupId = split[0];
+      latestTerm = Long.parseLong(split[1]);
+      List<UnconfirmedRequest> unconfirmedRequests =
+          groupUnconfirmedRequests.computeIfAbsent(raftGroupId, r -> new 
ArrayList<>());
       List<UnconfirmedRequest> tempUnconfirmedRequests = new 
ArrayList<>(unconfirmedRequests);
       unconfirmedRequests.clear();
       for (UnconfirmedRequest tempUnconfirmedRequest : 
tempUnconfirmedRequests) {
@@ -1061,5 +1073,10 @@ public class SessionConnection {
     private long index;
     private long term;
     private Object request;
+
+    @Override
+    public String toString() {
+      return index + "-" + term;
+    }
   }
 }

Reply via email to