This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/native_raft by this push:
     new 0cf0c1e028 fix leader check and forward
0cf0c1e028 is described below

commit 0cf0c1e028808d8c13031e7755c8fe2b8c4d8842
Author: Tian Jiang <[email protected]>
AuthorDate: Fri Feb 17 15:35:08 2023 +0800

    fix leader check and forward
---
 .../iotdb/consensus/natraft/protocol/RaftMember.java    | 17 +++++++++++++++++
 .../org/apache/iotdb/consensus/raft/ReplicateTest.java  |  7 +------
 2 files changed, 18 insertions(+), 6 deletions(-)

diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
index 8e2a3302c1..8ace88143f 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
@@ -567,11 +567,28 @@ public class RaftMember {
     WEAK_ACCEPT
   }
 
+  public boolean isLeader() {
+    return Objects.equals(status.leader.get(), thisNode);
+  }
+
   public TSStatus processRequest(IConsensusRequest request) {
     if (readOnly) {
       return StatusUtils.NODE_READ_ONLY;
     }
 
+    if (getLeader() == null) {
+      waitLeader();
+    }
+
+    if (!isLeader()) {
+      Peer leader = getLeader();
+      if (leader == null) {
+        return StatusUtils.NO_LEADER;
+      } else {
+        return forwardRequest(request, leader.getEndpoint(), 
leader.getGroupId());
+      }
+    }
+
     logger.debug("{}: Processing request {}", name, request);
     Entry entry = new RequestEntry(request);
 
diff --git 
a/consensus/src/test/java/org/apache/iotdb/consensus/raft/ReplicateTest.java 
b/consensus/src/test/java/org/apache/iotdb/consensus/raft/ReplicateTest.java
index a4dc42c236..e4c59f6115 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/raft/ReplicateTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/raft/ReplicateTest.java
@@ -126,7 +126,7 @@ public class ReplicateTest {
    * The three nodes use the requests in the queue to replicate the requests 
to the other two nodes
    */
   @Test
-  public void ReplicateUsingQueueTest() throws IOException, 
InterruptedException {
+  public void Replicate3NodeTest() throws IOException, InterruptedException {
     logger.info("Start ReplicateUsingQueueTest");
     servers.get(0).createPeer(group.getGroupId(), group.getPeers());
     servers.get(1).createPeer(group.getGroupId(), group.getPeers());
@@ -206,13 +206,8 @@ public class ReplicateTest {
     Assert.assertEquals(-1, servers.get(0).getMember(gid).getLastIndex());
     Assert.assertEquals(-1, servers.get(1).getMember(gid).getLastIndex());
 
-    Set<TestEntry> allEntries = new HashSet<>();
     for (int i = 0; i < CHECK_POINT; i++) {
       servers.get(0).write(gid, new TestEntry(i, peers.get(0)));
-      servers.get(1).write(gid, new TestEntry(i, peers.get(1)));
-      Assert.assertEquals(i, servers.get(0).getMember(gid).getLastIndex());
-      Assert.assertEquals(i, servers.get(1).getMember(gid).getLastIndex());
-      allEntries.add(new TestEntry(i, peers.get(2)));
     }
 
     for (int i = 0; i < 2; i++) {

Reply via email to