This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch expr
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/expr by this push:
new a4a21bb add raft group id in return message
a4a21bb is described below
commit a4a21bb497c8ec63cd5bf16fbabc3e81c90af9c5
Author: jt <[email protected]>
AuthorDate: Fri Dec 24 14:34:11 2021 +0800
add raft group id in return message
---
.../cluster/server/member/DataGroupMember.java | 3 ++-
.../iotdb/cluster/server/member/RaftMember.java | 29 +++++++++++---------
.../apache/iotdb/session/SessionConnection.java | 31 +++++++++++++++++-----
3 files changed, 42 insertions(+), 21 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 8ef416b..a6b585d 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -782,7 +782,8 @@ public class DataGroupMember extends RaftMember implements
DataGroupMemberMBean
private TSStatus executeNonQueryPlanWithKnownLeader(PhysicalPlan plan) {
if (character == NodeCharacter.LEADER) {
if (plan.getTargetedTerm() > 0 && plan.getTargetedTerm() != term.get()) {
- return
StatusUtils.getStatus(TSStatusCode.LEADER_CHANGED).setMessage(term.get() + "");
+ return StatusUtils.getStatus(TSStatusCode.LEADER_CHANGED)
+ .setMessage(getRaftGroupFullId() + "-" + term.get());
}
long startTime =
Statistic.DATA_GROUP_MEMBER_LOCAL_EXECUTION.getOperationStartTime();
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 287330c..f961cd8 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -1197,8 +1197,8 @@ public abstract class RaftMember implements
RaftMemberMBean {
Statistic.LOG_DISPATCHER_FROM_CREATE_TO_OK.calOperationCostTimeFromStart(
log.getCreateTime());
Statistic.LOG_DISPATCHER_TOTAL.calOperationCostTimeFromStart(totalStartTime);
- return StatusUtils.getStatus(TSStatusCode.WEAKLY_ACCEPTED)
- .setMessage(log.getCurrLogIndex() + "-" + log.getCurrLogTerm());
+ return includeLogNumbersInStatus(
+ StatusUtils.getStatus(TSStatusCode.WEAKLY_ACCEPTED), log);
case OK:
logger.debug(MSG_LOG_IS_ACCEPTED, name, log);
startTime =
Timer.Statistic.RAFT_SENDER_COMMIT_LOG.getOperationStartTime();
@@ -1207,9 +1207,7 @@ public abstract class RaftMember implements
RaftMemberMBean {
Statistic.LOG_DISPATCHER_FROM_CREATE_TO_OK.calOperationCostTimeFromStart(
log.getCreateTime());
Statistic.LOG_DISPATCHER_TOTAL.calOperationCostTimeFromStart(totalStartTime);
- return StatusUtils.OK
- .deepCopy()
- .setMessage(log.getCurrLogIndex() + "-" + log.getCurrLogTerm());
+ return includeLogNumbersInStatus(StatusUtils.OK.deepCopy(), log);
case TIME_OUT:
logger.debug("{}: log {} timed out...", name, log);
break;
@@ -1224,6 +1222,11 @@ public abstract class RaftMember implements
RaftMemberMBean {
return StatusUtils.TIME_OUT;
}
+ private TSStatus includeLogNumbersInStatus(TSStatus status, Log log) {
+ return status.setMessage(
+ getRaftGroupFullId() + "-" + log.getCurrLogIndex() + "-" +
log.getCurrLogTerm());
+ }
+
public SendLogRequest buildSendLogRequest(Log log) {
VotingLog votingLog = buildVotingLog(log);
AtomicBoolean leaderShipStale = new AtomicBoolean(false);
@@ -1898,9 +1901,7 @@ public abstract class RaftMember implements
RaftMemberMBean {
logger.debug(MSG_LOG_IS_ACCEPTED, name, log);
commitLog(log.getLog());
Timer.Statistic.RAFT_SENDER_COMMIT_LOG.calOperationCostTimeFromStart(startTime);
- return StatusUtils.OK
- .deepCopy()
- .setMessage(log.getLog().getCurrLogIndex() + "-" +
log.getLog().getCurrLogTerm());
+ return includeLogNumbersInStatus(StatusUtils.OK.deepCopy(),
log.getLog());
}
int retryTime = 0;
@@ -1920,17 +1921,15 @@ public abstract class RaftMember implements
RaftMemberMBean {
Statistic.LOG_DISPATCHER_FROM_CREATE_TO_OK.calOperationCostTimeFromStart(
log.getLog().getCreateTime());
Statistic.LOG_DISPATCHER_TOTAL.calOperationCostTimeFromStart(totalStartTime);
- return StatusUtils.getStatus(TSStatusCode.WEAKLY_ACCEPTED)
- .setMessage(log.getLog().getCurrLogIndex() + "-" +
log.getLog().getCurrLogTerm());
+ return includeLogNumbersInStatus(
+ StatusUtils.getStatus(TSStatusCode.WEAKLY_ACCEPTED),
log.getLog());
case OK:
startTime =
Timer.Statistic.RAFT_SENDER_COMMIT_LOG.getOperationStartTime();
logger.debug(MSG_LOG_IS_ACCEPTED, name, log);
commitLog(log.getLog());
Timer.Statistic.RAFT_SENDER_COMMIT_LOG.calOperationCostTimeFromStart(startTime);
Statistic.LOG_DISPATCHER_TOTAL.calOperationCostTimeFromStart(totalStartTime);
- return StatusUtils.OK
- .deepCopy()
- .setMessage(log.getLog().getCurrLogIndex() + "-" +
log.getLog().getCurrLogTerm());
+ return includeLogNumbersInStatus(StatusUtils.OK.deepCopy(),
log.getLog());
case TIME_OUT:
logger.debug("{}: log {} timed out, retrying...", name, log);
try {
@@ -2282,4 +2281,8 @@ public abstract class RaftMember implements
RaftMemberMBean {
public String getLastCatchUpResponseTimeAsString() {
return lastCatchUpResponseTime.toString();
}
+
+ public String getRaftGroupFullId() {
+ return (getHeader() != null ? getHeader().node.nodeIdentifier : 0) + "#" +
getRaftGroupId();
+ }
}
diff --git
a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index ae468a7..d82f78b 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -66,7 +66,9 @@ import org.slf4j.LoggerFactory;
import java.time.ZoneId;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Random;
public class SessionConnection {
@@ -84,7 +86,7 @@ public class SessionConnection {
private List<EndPoint> endPointList = new ArrayList<>();
private boolean enableRedirect = false;
- private List<UnconfirmedRequest> unconfirmedRequests = new ArrayList<>();
+ private Map<String, List<UnconfirmedRequest>> groupUnconfirmedRequests = new
HashMap<>();
private long latestTerm;
// TestOnly
@@ -984,8 +986,11 @@ public class SessionConnection {
}
try {
String[] split = message.split("-");
- long index = Long.parseLong(split[0]);
- long term = Long.parseLong(split[1]);
+ String raftGroupId = split[0];
+ long index = Long.parseLong(split[1]);
+ long term = Long.parseLong(split[2]);
+ List<UnconfirmedRequest> unconfirmedRequests =
+ groupUnconfirmedRequests.computeIfAbsent(raftGroupId, r -> new
ArrayList<>());
if (!unconfirmedRequests.isEmpty() && unconfirmedRequests.get(0).term ==
term) {
unconfirmedRequests.removeIf(r -> r.index < index);
}
@@ -996,13 +1001,16 @@ public class SessionConnection {
}
private void onWeaklyAccepted(String message, Object request) {
- if (message == null) {
+ if (message == null || message.isEmpty()) {
return;
}
try {
String[] split = message.split("-");
- long index = Long.parseLong(split[0]);
- long term = Long.parseLong(split[1]);
+ String raftGroupId = split[0];
+ long index = Long.parseLong(split[1]);
+ long term = Long.parseLong(split[2]);
+ List<UnconfirmedRequest> unconfirmedRequests =
+ groupUnconfirmedRequests.computeIfAbsent(raftGroupId, r -> new
ArrayList<>());
UnconfirmedRequest unconfirmedRequest = new UnconfirmedRequest();
unconfirmedRequest.index = index;
unconfirmedRequest.term = term;
@@ -1019,7 +1027,11 @@ public class SessionConnection {
return;
}
try {
- latestTerm = Long.parseLong(message);
+ String[] split = message.split("-");
+ String raftGroupId = split[0];
+ latestTerm = Long.parseLong(split[1]);
+ List<UnconfirmedRequest> unconfirmedRequests =
+ groupUnconfirmedRequests.computeIfAbsent(raftGroupId, r -> new
ArrayList<>());
List<UnconfirmedRequest> tempUnconfirmedRequests = new
ArrayList<>(unconfirmedRequests);
unconfirmedRequests.clear();
for (UnconfirmedRequest tempUnconfirmedRequest :
tempUnconfirmedRequests) {
@@ -1061,5 +1073,10 @@ public class SessionConnection {
private long index;
private long term;
private Object request;
+
+ @Override
+ public String toString() {
+ return index + "-" + term;
+ }
}
}