This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/native_raft by this push:
new 0cf0c1e028 fix leader check and forward
0cf0c1e028 is described below
commit 0cf0c1e028808d8c13031e7755c8fe2b8c4d8842
Author: Tian Jiang <[email protected]>
AuthorDate: Fri Feb 17 15:35:08 2023 +0800
fix leader check and forward
---
.../iotdb/consensus/natraft/protocol/RaftMember.java | 17 +++++++++++++++++
.../org/apache/iotdb/consensus/raft/ReplicateTest.java | 7 +------
2 files changed, 18 insertions(+), 6 deletions(-)
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
index 8e2a3302c1..8ace88143f 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
@@ -567,11 +567,28 @@ public class RaftMember {
WEAK_ACCEPT
}
+ public boolean isLeader() {
+ return Objects.equals(status.leader.get(), thisNode);
+ }
+
public TSStatus processRequest(IConsensusRequest request) {
if (readOnly) {
return StatusUtils.NODE_READ_ONLY;
}
+ if (getLeader() == null) {
+ waitLeader();
+ }
+
+ if (!isLeader()) {
+ Peer leader = getLeader();
+ if (leader == null) {
+ return StatusUtils.NO_LEADER;
+ } else {
+ return forwardRequest(request, leader.getEndpoint(),
leader.getGroupId());
+ }
+ }
+
logger.debug("{}: Processing request {}", name, request);
Entry entry = new RequestEntry(request);
diff --git
a/consensus/src/test/java/org/apache/iotdb/consensus/raft/ReplicateTest.java
b/consensus/src/test/java/org/apache/iotdb/consensus/raft/ReplicateTest.java
index a4dc42c236..e4c59f6115 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/raft/ReplicateTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/raft/ReplicateTest.java
@@ -126,7 +126,7 @@ public class ReplicateTest {
* The three nodes use the requests in the queue to replicate the requests
to the other two nodes
*/
@Test
- public void ReplicateUsingQueueTest() throws IOException,
InterruptedException {
+ public void Replicate3NodeTest() throws IOException, InterruptedException {
logger.info("Start ReplicateUsingQueueTest");
servers.get(0).createPeer(group.getGroupId(), group.getPeers());
servers.get(1).createPeer(group.getGroupId(), group.getPeers());
@@ -206,13 +206,8 @@ public class ReplicateTest {
Assert.assertEquals(-1, servers.get(0).getMember(gid).getLastIndex());
Assert.assertEquals(-1, servers.get(1).getMember(gid).getLastIndex());
- Set<TestEntry> allEntries = new HashSet<>();
for (int i = 0; i < CHECK_POINT; i++) {
servers.get(0).write(gid, new TestEntry(i, peers.get(0)));
- servers.get(1).write(gid, new TestEntry(i, peers.get(1)));
- Assert.assertEquals(i, servers.get(0).getMember(gid).getLastIndex());
- Assert.assertEquals(i, servers.get(1).getMember(gid).getLastIndex());
- allEntries.add(new TestEntry(i, peers.get(2)));
}
for (int i = 0; i < 2; i++) {