This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/expr by this push:
     new 4c2d92e  fix tests
4c2d92e is described below

commit 4c2d92ee5b1a1326e04052966bd8a117a202e978
Author: jt <[email protected]>
AuthorDate: Wed Jan 5 18:27:37 2022 +0800

    fix tests
---
 .../apache/iotdb/cluster/log/LogDispatcher.java    |  2 ++
 .../iotdb/cluster/partition/PartitionTable.java    |  7 ++++
 .../cluster/partition/slot/SlotPartitionTable.java |  2 +-
 .../cluster/server/member/MetaGroupMember.java     |  4 +--
 .../iotdb/cluster/server/member/RaftMember.java    |  2 ++
 .../cluster/server/service/DataGroupEngine.java    | 16 +++++----
 .../cluster/server/service/MetaAsyncService.java   |  9 +++--
 .../server/heartbeat/MetaHeartbeatThreadTest.java  |  5 +++
 .../iotdb/cluster/server/member/BaseMember.java    | 10 ++++--
 .../cluster/server/member/MetaGroupMemberTest.java | 39 ++++++++++++++++------
 10 files changed, 70 insertions(+), 26 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index 2626d69..8392289 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -428,6 +428,8 @@ public class LogDispatcher {
       try {
         long operationStartTime = 
Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
         for (int i = 0; i < retries; i++) {
+          
logRequest.getVotingLog().getFailedNodeIds().remove(receiver.nodeIdentifier);
+          
logRequest.getVotingLog().getStronglyAcceptedNodeIds().remove(Integer.MAX_VALUE);
           AppendEntryResult result = 
syncClient.appendEntry(logRequest.appendEntryRequest);
           if (result.status == Response.RESPONSE_OUT_OF_WINDOW) {
             Thread.sleep(100);
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
index 2767d4e..caecf06 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
@@ -93,6 +93,13 @@ public interface PartitionTable {
    */
   PartitionGroup getPartitionGroup(RaftNode headerNode);
 
+  /**
+   * find replicationNum groups that a node is in
+   * @param node
+   * @return
+   */
+  List<PartitionGroup> getPartitionGroups(Node node);
+
   ByteBuffer serialize();
 
   /**
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
index 60c532a..d0bbc73 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
@@ -189,7 +189,7 @@ public class SlotPartitionTable implements PartitionTable {
   }
 
   // find replicationNum groups that a node is in
-  private List<PartitionGroup> getPartitionGroups(Node node) {
+  public List<PartitionGroup> getPartitionGroups(Node node) {
     List<PartitionGroup> ret = new ArrayList<>();
 
     int nodeIndex = findNodeIndex(node, nodeRing);
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index fc2ddc0..e312319 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -740,7 +740,7 @@ public class MetaGroupMember extends RaftMember implements 
IService, MetaGroupMe
     logger.info("Starting sub-servers...");
     synchronized (partitionTable) {
       try {
-        getDataGroupEngine().buildDataGroupMembers(partitionTable);
+        getDataGroupEngine().buildDataGroupMembers(thisNode, partitionTable);
         sendHandshake();
       } catch (Exception e) {
         logger.error("Build partition table failed: ", e);
@@ -1701,7 +1701,7 @@ public class MetaGroupMember extends RaftMember 
implements IService, MetaGroupMe
       // update DataGroupMembers, as the node is removed, the members of some 
groups are
       // changed and there will also be one less group
       NodeRemovalResult result = partitionTable.getNodeRemovalResult();
-      getDataGroupEngine().removeNode(oldNode, result);
+      getDataGroupEngine().removeNode(oldNode, thisNode, result);
 
       // the leader is removed, start the next election ASAP
       if (oldNode.equals(leader.get()) && !oldNode.equals(thisNode)) {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index ea3b463..66fee6b 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -2005,6 +2005,8 @@ public abstract class RaftMember implements 
RaftMemberMBean {
     AtomicLong newLeaderTerm = new AtomicLong(term.get());
 
     AppendEntryRequest request = buildAppendEntryRequest(log.getLog(), true);
+    log.getFailedNodeIds().clear();
+    log.getStronglyAcceptedNodeIds().remove(Integer.MAX_VALUE);
 
     try {
       if (allNodes.size() > 2) {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupEngine.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupEngine.java
index 6409acd..6f68acb 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupEngine.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupEngine.java
@@ -71,7 +71,7 @@ public class DataGroupEngine implements IService, 
DataGroupEngineMBean {
   private PartitionTable partitionTable;
   private DataGroupMember.Factory dataMemberFactory;
   private static MetaGroupMember metaGroupMember;
-  private final Node thisNode = ClusterIoTDB.getInstance().getThisNode();
+  private Node thisNode = ClusterIoTDB.getInstance().getThisNode();
   private static TProtocolFactory protocolFactory;
 
   private DataGroupEngine() {
@@ -368,7 +368,8 @@ public class DataGroupEngine implements IService, 
DataGroupEngineMBean {
    * group which the local node is in) and start them.
    */
   @SuppressWarnings("java:S1135")
-  public void buildDataGroupMembers(PartitionTable partitionTable) {
+  public void buildDataGroupMembers(Node thisNode,
+      PartitionTable partitionTable) {
     setPartitionTable(partitionTable);
     // TODO-Cluster: if there are unchanged members, do not stop and restart 
them
     // clear previous members if the partition table is reloaded
@@ -380,7 +381,7 @@ public class DataGroupEngine implements IService, 
DataGroupEngineMBean {
       value.setUnchanged(false);
     }
 
-    List<PartitionGroup> partitionGroups = partitionTable.getLocalGroups();
+    List<PartitionGroup> partitionGroups = 
partitionTable.getPartitionGroups(thisNode);
     for (PartitionGroup partitionGroup : partitionGroups) {
       RaftNode header = partitionGroup.getHeader();
       DataGroupMember prevMember = headerGroupMap.get(header);
@@ -426,17 +427,18 @@ public class DataGroupEngine implements IService, 
DataGroupEngineMBean {
    * group, set the member to read only so that it can still provide data for 
other nodes that has
    * not yet pulled its data. Otherwise, just change the node list of the 
member and pull new data.
    * And create a new DataGroupMember if this node should join a new group 
because of this removal.
-   *
-   * @param node
+   *  @param node
+   * @param thisNode
    * @param removalResult cluster changes due to the node removal
    */
-  public void removeNode(Node node, NodeRemovalResult removalResult) {
+  public void removeNode(Node node, Node thisNode,
+      NodeRemovalResult removalResult) {
     Iterator<Entry<RaftNode, DataGroupMember>> entryIterator = 
headerGroupMap.entrySet().iterator();
     synchronized (headerGroupMap) {
       while (entryIterator.hasNext()) {
         Entry<RaftNode, DataGroupMember> entry = entryIterator.next();
         DataGroupMember dataGroupMember = entry.getValue();
-        if (dataGroupMember.getHeader().getNode().equals(node) || 
node.equals(thisNode)) {
+        if (dataGroupMember.getHeader().getNode().equals(node) || 
node.equals(this.thisNode)) {
           entryIterator.remove();
           removeMember(
               entry.getKey(), dataGroupMember, 
dataGroupMember.getHeader().getNode().equals(node));
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
index 878528b..ff29dc4 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
@@ -50,6 +50,7 @@ import java.nio.ByteBuffer;
 import java.util.List;
 
 public class MetaAsyncService extends BaseAsyncService implements 
TSMetaService.AsyncIface {
+
   private static final String ERROR_MSG_META_NOT_READY = "The metadata not is 
not ready.";
   private static final Logger logger = 
LoggerFactory.getLogger(MetaAsyncService.class);
 
@@ -61,7 +62,8 @@ public class MetaAsyncService extends BaseAsyncService 
implements TSMetaService.
   }
 
   @Override
-  public void appendEntry(AppendEntryRequest request, AsyncMethodCallback 
resultHandler) {
+  public void appendEntry(AppendEntryRequest request,
+      AsyncMethodCallback<AppendEntryResult> resultHandler) {
     // if the metaGroupMember is not ready (e.g., as a follower the 
PartitionTable is loaded
     // locally, but the partition table is not verified), we do not handle the 
RPC requests.
     if (!metaGroupMember.isReady() && metaGroupMember.getPartitionTable() == 
null) {
@@ -71,7 +73,8 @@ public class MetaAsyncService extends BaseAsyncService 
implements TSMetaService.
       // ready.
       // this node lacks information of the cluster and refuse to work
       logger.debug("This node is blind to the cluster and cannot accept logs");
-      resultHandler.onComplete(Response.RESPONSE_PARTITION_TABLE_UNAVAILABLE);
+      resultHandler.onComplete(
+          new 
AppendEntryResult().setStatus(Response.RESPONSE_PARTITION_TABLE_UNAVAILABLE));
       return;
     }
 
@@ -214,7 +217,7 @@ public class MetaAsyncService extends BaseAsyncService 
implements TSMetaService.
   /**
    * Forward a node removal request to the leader.
    *
-   * @param node the node to be removed
+   * @param node          the node to be removed
    * @param resultHandler
    * @return true if the request is successfully forwarded, false otherwise
    */
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
 
b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
index 00b3f83..aea1eab 100644
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
@@ -101,6 +101,11 @@ public class MetaHeartbeatThreadTest extends 
HeartbeatThreadTest {
         }
 
         @Override
+        public List<PartitionGroup> getPartitionGroups(Node node) {
+          return null;
+        }
+
+        @Override
         public boolean deserialize(ByteBuffer buffer) {
           return true;
         }
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/BaseMember.java 
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/BaseMember.java
index f897701..c585022 100644
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/BaseMember.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/BaseMember.java
@@ -19,6 +19,9 @@
 
 package org.apache.iotdb.cluster.server.member;
 
+import static 
org.apache.iotdb.cluster.server.member.MetaGroupMember.NODE_IDENTIFIER_FILE_NAME;
+import static 
org.apache.iotdb.cluster.server.member.MetaGroupMember.PARTITION_FILE_NAME;
+
 import org.apache.iotdb.cluster.ClusterIoTDB;
 import org.apache.iotdb.cluster.client.ClientCategory;
 import org.apache.iotdb.cluster.client.ClientManager;
@@ -203,8 +206,8 @@ public class BaseMember {
     RegisterManager.setDeregisterTimeOut(100);
     EnvironmentUtils.cleanEnv();
     ClusterDescriptor.getInstance().getConfig().setSeedNodeUrls(prevUrls);
-    new File(MetaGroupMember.PARTITION_FILE_NAME).delete();
-    new File(MetaGroupMember.NODE_IDENTIFIER_FILE_NAME).delete();
+    new File(PARTITION_FILE_NAME).delete();
+    new File(NODE_IDENTIFIER_FILE_NAME).delete();
     RaftMember.setWaitLeaderTimeMs(prevLeaderWait);
     testThreadPool.shutdownNow();
     
ClusterDescriptor.getInstance().getConfig().setUseAsyncServer(prevUseAsyncServer);
@@ -215,6 +218,9 @@ public class BaseMember {
     ClusterConstant.setSyncLeaderMaxWaitMs(syncLeaderMaxWait);
     ClusterConstant.setHeartbeatIntervalMs(heartBeatInterval);
     ClusterConstant.setElectionTimeoutMs(electionTimeout);
+
+    new File(PARTITION_FILE_NAME).delete();
+    new File(NODE_IDENTIFIER_FILE_NAME).delete();
   }
 
   DataGroupMember getDataGroupMember(Node node) {
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
 
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 99aa6d3..ac2996b 100644
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -144,6 +144,7 @@ import static 
org.apache.iotdb.cluster.server.NodeCharacter.LEADER;
 import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -189,7 +190,7 @@ public class MetaGroupMemberTest extends BaseMember {
             new DataGroupMember.Factory(new Factory(), testMetaMember) {
               @Override
               public DataGroupMember create(Node thisNode, PartitionGroup 
partitionGroup) {
-                return getDataGroupMember(partitionGroup, 
TestUtils.getNode(0));
+                return getDataGroupMember(partitionGroup, thisNode);
               }
             },
             testMetaMember);
@@ -1026,7 +1027,7 @@ public class MetaGroupMemberTest extends BaseMember {
   }
 
   @Test
-  public void testProcessValidHeartbeatReq() throws QueryProcessException {
+  public void testProcessValidHeartbeatReq() {
     System.out.println("Start testProcessValidHeartbeatReq()");
     MetaGroupMember testMetaMember = getMetaGroupMember(TestUtils.getNode(10));
     partitionTable = new SlotPartitionTable(allNodes, TestUtils.getNode(0));
@@ -1041,9 +1042,23 @@ public class MetaGroupMemberTest extends BaseMember {
       request.setRegenerateIdentifier(true);
       testMetaMember.setPartitionTable(null);
       testMetaMember.processValidHeartbeatReq(request, response);
-      assertTrue(response.getFollowerIdentifier() != 10);
+      assertNotEquals(10, response.getFollowerIdentifier());
       assertTrue(response.isRequirePartitionTable());
+    } finally {
+      testMetaMember.stop();
+    }
+  }
 
+  @Test
+  public void testProcessValidHeartbeatReq2() {
+    System.out.println("Start testProcessValidHeartbeatReq()");
+    MetaGroupMember testMetaMember = getMetaGroupMember(TestUtils.getNode(10));
+    partitionTable = new SlotPartitionTable(allNodes, TestUtils.getNode(0));
+    testMetaMember.setCoordinator(new Coordinator());
+    try {
+      HeartBeatRequest request = new HeartBeatRequest();
+      HeartBeatResponse response = new HeartBeatResponse();
+      request.setRegenerateIdentifier(false);
       request.setPartitionTableBytes(partitionTable.serialize());
       testMetaMember.processValidHeartbeatReq(request, response);
       assertEquals(partitionTable, testMetaMember.getPartitionTable());
@@ -1055,7 +1070,7 @@ public class MetaGroupMemberTest extends BaseMember {
   @Test
   public void testProcessValidHeartbeatResp() throws QueryProcessException {
     System.out.println("Start testProcessValidHeartbeatResp()");
-    MetaGroupMember metaGroupMember = getMetaGroupMember(TestUtils.getNode(9));
+    MetaGroupMember metaGroupMember = 
getMetaGroupMember(TestUtils.getNode(10));
     metaGroupMember.start();
     metaGroupMember.onElectionWins();
     try {
@@ -1088,20 +1103,21 @@ public class MetaGroupMemberTest extends BaseMember {
     request.setLeaderCommit(0);
     request.setPrevLogIndex(-1);
     request.setPrevLogTerm(-1);
+    request.setIsFromLeader(true);
     request.setLeader(new Node("127.0.0.1", 30000, 0, 40000, 
Constants.RPC_PORT, "127.0.0.1"));
-    AtomicReference<Long> result = new AtomicReference<>();
-    GenericHandler<Long> handler = new GenericHandler<>(TestUtils.getNode(0), 
result);
+    AtomicReference<AppendEntryResult> result = new AtomicReference<>();
+    GenericHandler<AppendEntryResult> handler = new 
GenericHandler<>(TestUtils.getNode(0), result);
     testMetaMember.setPartitionTable(null);
     testMetaMember.setReady(false);
     new MetaAsyncService(testMetaMember).appendEntry(request, handler);
-    assertEquals(Response.RESPONSE_PARTITION_TABLE_UNAVAILABLE, (long) 
result.get());
+    assertEquals(Response.RESPONSE_PARTITION_TABLE_UNAVAILABLE, 
result.get().status);
     System.out.println("Term after first append: " + 
testMetaMember.getTerm().get());
 
     testMetaMember.setPartitionTable(partitionTable);
     testMetaMember.setReady(true);
     new MetaAsyncService(testMetaMember).appendEntry(request, handler);
     System.out.println("Term after second append: " + 
testMetaMember.getTerm().get());
-    assertEquals(Response.RESPONSE_AGREE, (long) result.get());
+    assertEquals(Response.RESPONSE_STRONG_ACCEPT,  result.get().status);
   }
 
   @Test
@@ -1249,10 +1265,11 @@ public class MetaGroupMemberTest extends BaseMember {
     System.out.println("Start testLoadIdentifier()");
     try (RandomAccessFile raf =
         new RandomAccessFile(MetaGroupMember.NODE_IDENTIFIER_FILE_NAME, "rw")) 
{
-      raf.writeBytes("100");
+      raf.writeBytes("50");
     }
-    MetaGroupMember metaGroupMember = getMetaGroupMember(new Node());
-    assertEquals(100, metaGroupMember.getThisNode().getNodeIdentifier());
+    MetaGroupMember metaGroupMember =
+        getMetaGroupMember(TestUtils.getNode(50).setNodeIdentifier(0));
+    assertEquals(50, metaGroupMember.getThisNode().getNodeIdentifier());
     metaGroupMember.stop();
   }
 

Reply via email to