This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster_scalability
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit eb45f3063414f129eee8bb5e4f99fe530dd1f541
Author: lta <[email protected]>
AuthorDate: Wed Apr 7 22:25:31 2021 +0800

    This commit fix following issues:
    1. fix a dead-lock of headerGroupMap and asyncServiceMap in method 
addDataGroup.
    2. add synchronized monitor in heartbeat wait object.
---
 .../iotdb/cluster/server/DataClusterServer.java    | 22 +++++++----
 .../cluster/server/member/DataGroupMember.java     | 43 +++-------------------
 .../cluster/server/member/MetaGroupMember.java     | 32 ++++++++++------
 3 files changed, 41 insertions(+), 56 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index 0e48aac..91d49cf 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -122,15 +122,18 @@ public class DataClusterServer extends RaftServer 
implements TSDataService.Async
   public DataGroupMember addDataGroupMember(DataGroupMember dataGroupMember, 
RaftNode header) {
     synchronized (headerGroupMap) {
       if (headerGroupMap.containsKey(header)) {
-        logger.debug("group {} already exist.", dataGroupMember.getAllNodes());
+        logger.debug("Group {} already exist.", dataGroupMember.getAllNodes());
         return headerGroupMap.get(header);
       }
       stoppedMemberManager.remove(header);
       headerGroupMap.put(header, dataGroupMember);
-      resetServiceCache(header);
+
       dataGroupMember.start();
-      return dataGroupMember;
     }
+    logger.info("Add group {} successfully.", dataGroupMember.getName());
+    resetServiceCache(header); // avoid dead-lock
+
+    return dataGroupMember;
   }
 
   private void resetServiceCache(RaftNode header) {
@@ -597,12 +600,17 @@ public class DataClusterServer extends RaftServer 
implements TSDataService.Async
    * @param header
    * @param dataGroupMember
    */
-  private void removeMember(RaftNode header, DataGroupMember dataGroupMember, 
boolean waitFollowersToSync) {
+  private void removeMember(RaftNode header, DataGroupMember dataGroupMember, 
boolean removedGroup) {
     dataGroupMember.setReadOnly();
-    if (waitFollowersToSync && dataGroupMember.getCharacter() == 
NodeCharacter.LEADER) {
-      dataGroupMember.getAppendLogThreadPool().submit(() -> 
dataGroupMember.waitFollowersToSync());
-    } else {
+    if (!removedGroup) {
       dataGroupMember.stop();
+    } else {
+      if (dataGroupMember.getCharacter() != NodeCharacter.LEADER) {
+        new Thread(()-> {
+          dataGroupMember.syncLeader();
+          dataGroupMember.stop();
+        }).start();
+      }
     }
     stoppedMemberManager.put(header, dataGroupMember);
     logger.info("Data group member has removed, header {}, group is {}.", 
header,
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 8cb12cc..f142f67 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -350,6 +350,8 @@ public class DataGroupMember extends RaftMember {
           synchronized (term) {
             setCharacter(NodeCharacter.ELECTOR);
             setLeader(null);
+          }
+          synchronized (getHeartBeatWaitObject()) {
             getHeartBeatWaitObject().notifyAll();
           }
         }
@@ -759,6 +761,8 @@ public class DataGroupMember extends RaftMember {
           synchronized (term) {
             setCharacter(NodeCharacter.ELECTOR);
             setLeader(null);
+          }
+          synchronized (getHeartBeatWaitObject()) {
             getHeartBeatWaitObject().notifyAll();
           }
         }
@@ -796,39 +800,6 @@ public class DataGroupMember extends RaftMember {
   }
 
   /**
-   * When the header of a partition group is removed, it needs to wait all 
followers to sync data
-   * because there has no new leader.
-   */
-  public void waitFollowersToSync() {
-    try {
-      for (Map.Entry<Node, Peer> entry : peerMap.entrySet()) {
-        Node node = entry.getKey();
-        if (node.equals(thisNode)) {
-          continue;
-        }
-        Peer peer = entry.getValue();
-        while (peer.getMatchIndex() < logManager.getCommitLogIndex()) {
-          try {
-            Thread.sleep(10);
-            if (character != NodeCharacter.LEADER) {
-              return;
-            }
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            logger
-                .warn("{}: Unexpected interruption when waiting follower {} to 
sync, raft id is {}",
-                    name, node, getRaftGroupId());
-          }
-        }
-        logger.info("{}: Follower {} has synced with leader, raft id is {}", 
name, node,
-            getRaftGroupId());
-      }
-    } finally {
-      stop();
-    }
-  }
-
-  /**
    * Generate a report containing the character, leader, term, last log term, 
last log index, header
    * and readOnly or not of this member.
    */
@@ -864,7 +835,7 @@ public class DataGroupMember extends RaftMember {
   }
 
   public boolean onSnapshotInstalled(List<Integer> slots) {
-    if (!isHasSyncedLeaderBeforeRemoved()) {
+    if 
(getMetaGroupMember().getPartitionTable().getAllNodes().contains(thisNode)) {
       
getMetaGroupMember().waitUtil(getMetaGroupMember().getPartitionTable().getLastMetaLogIndex());
     }
     if (logger.isDebugEnabled()) {
@@ -926,10 +897,6 @@ public class DataGroupMember extends RaftMember {
     lastAppliedPartitionTableVersion.save();
   }
 
-  public long getLastAppliedPartitionTableVersion() {
-    return lastAppliedPartitionTableVersion.getVersion();
-  }
-
   private class LastAppliedPatitionTableVersion {
 
     private static final String VERSION_FILE_NAME = 
"LAST_PARTITION_TABLE_VERSION";
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 4a33693..f064c86 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -605,6 +605,7 @@ public class MetaGroupMember extends RaftMember {
       logger.info("Node {} admitted this node into the cluster", node);
       ByteBuffer partitionTableBuffer = resp.partitionTableBytes;
       acceptPartitionTable(partitionTableBuffer, true);
+      getDataClusterServer().pullSnapshots();
       return true;
     } else if (resp.getRespNum() == Response.RESPONSE_IDENTIFIER_CONFLICT) {
       logger.info("The identifier {} conflicts the existing ones, regenerate a 
new one",
@@ -719,8 +720,6 @@ public class MetaGroupMember extends RaftMember {
     updateNodeList(newTable.getAllNodes());
 
     startSubServers();
-
-    getDataClusterServer().pullSnapshots();
   }
 
   private void updateNodeList(Collection<Node> nodes) {
@@ -2219,20 +2218,31 @@ public class MetaGroupMember extends RaftMember {
       getDataClusterServer().removeNode(oldNode, result);
 
       // the leader is removed, start the next election ASAP
-      if (oldNode.equals(leader.get())) {
-        setCharacter(NodeCharacter.ELECTOR);
-        setLeader(ClusterConstant.EMPTY_NODE);
-        lastHeartbeatReceivedTime = Long.MIN_VALUE;
+      if (oldNode.equals(leader.get()) && !oldNode.equals(thisNode)) {
+        synchronized (term) {
+          setCharacter(NodeCharacter.ELECTOR);
+          setLeader(null);
+        }
+        synchronized (getHeartBeatWaitObject()) {
+          getHeartBeatWaitObject().notifyAll();
+        }
       }
 
       if (oldNode.equals(thisNode)) {
         // use super.stop() so that the data server will not be closed because 
other nodes may
         // want to pull data from this node
-        super.stop();
-        if (clientServer != null) {
-          clientServer.stop();
-        }
-        logger.info("{} has been removed from the cluster", name);
+        new Thread(() -> {
+          try {
+            Thread.sleep(RaftServer.getHeartBeatIntervalMs());
+          } catch (InterruptedException e) {
+            //ignore
+          }
+          super.stop();
+          if (clientServer != null) {
+            clientServer.stop();
+          }
+          logger.info("{} has been removed from the cluster", name);
+        }).start();
       } else if (thisNode.equals(leader.get())) {
         // as the old node is removed, it cannot know this by heartbeat or 
log, so it should be
         // directly kicked out of the cluster

Reply via email to