This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster_scalability
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 82abad07ae4509752809f1be84702176e7b1af44
Author: lta <[email protected]>
AuthorDate: Thu Apr 1 11:01:16 2021 +0800

    This commit fix following issues:
    1. add lock to put DataGroupMember to headerMap in order to handle 
concurrent issue
    2. fix a bug of using not started DataGroupMember to pull snapshot
    3. put exile to backgroud
---
 .../java/org/apache/iotdb/cluster/ClusterMain.java |  2 +-
 .../iotdb/cluster/config/ClusterConstant.java      |  2 ++
 .../cluster/log/snapshot/PullSnapshotTask.java     |  2 ++
 .../iotdb/cluster/query/reader/DataSourceInfo.java |  6 ++--
 .../iotdb/cluster/server/DataClusterServer.java    | 33 +++++++++----------
 .../cluster/server/member/DataGroupMember.java     | 37 ++++++++++++++++------
 .../cluster/server/member/MetaGroupMember.java     | 33 ++++++++++++++++---
 .../iotdb/cluster/server/member/RaftMember.java    | 18 ++++-------
 .../cluster/server/member/MetaGroupMemberTest.java |  3 +-
 9 files changed, 91 insertions(+), 45 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
index 8164e8e..ecef889 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
@@ -86,7 +86,7 @@ public class ClusterMain {
     }
 
     IoTDBDescriptor.getInstance().getConfig().setSyncEnable(false);
-    IoTDBDescriptor.getInstance().getConfig().setEnablePartition(true);
+    IoTDBDescriptor.getInstance().getConfig().setEnablePartition(false);
     logger.info("Running mode {}", mode);
     if (MODE_START.equals(mode)) {
       try {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java
index 8e3f304..b846849 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java
@@ -37,6 +37,8 @@ public class ClusterConstant {
 
   public static final int RETRY_WAIT_TIME_MS = 10;
 
+  public static final int THREAD_POLL_WAIT_TERMINATION_TIME = 10 * 1000;
+
   public static final Node EMPTY_NODE = new Node();
 
   private ClusterConstant() {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
index e1ae342..8f46616 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
@@ -181,6 +181,7 @@ public class PullSnapshotTask<T extends Snapshot> 
implements Callable<Void> {
       try {
         // sequentially pick up a node that may have this slot
         nodeIndex = (nodeIndex + 1) % descriptor.getPreviousHolders().size();
+        long startTime = System.currentTimeMillis();
         finished = pullSnapshot(nodeIndex);
         if (!finished) {
           if (logger.isDebugEnabled()) {
@@ -191,6 +192,7 @@ public class PullSnapshotTask<T extends Snapshot> 
implements Callable<Void> {
               .sleep(
                   
ClusterDescriptor.getInstance().getConfig().getPullSnapshotRetryIntervalMs());
         }
+        logger.debug("{}: Data migration ends, cost {}ms", newMember, 
(System.currentTimeMillis() - startTime));
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         finished = true;
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
index 8dcd45f..24b6bc5 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
@@ -90,19 +90,21 @@ public class DataSourceInfo {
         Long newReaderId = getReaderId(node, byTimestamp, timestamp);
         if (newReaderId != null) {
           logger.debug("get a readerId {} for {} from {}", newReaderId, 
request.path, node);
-          if (newReaderId != -1) {
+          if (newReaderId >= 0) {
             // register the node so the remote resources can be released
             context.registerRemoteNode(node, partitionGroup.getHeader(), 
partitionGroup.getId());
             this.readerId = newReaderId;
             this.curSource = node;
             this.curPos = nextNodePos;
             return true;
-          } else {
+          } else if (newReaderId == -1){
             // the id being -1 means there is no satisfying data on the remote 
node, create an
             // empty reader to reduce further communication
             this.isNoClient = true;
             this.isNoData = true;
             return false;
+          } else {
+            logger.debug("change other client for better query performance.");
           }
         }
       } catch (TException | IOException e) {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index 20e1471..0e48aac 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -119,17 +119,18 @@ public class DataClusterServer extends RaftServer 
implements TSDataService.Async
    *
    * @param dataGroupMember
    */
-  public void addDataGroupMember(DataGroupMember dataGroupMember) {
-    RaftNode header = new RaftNode(dataGroupMember.getHeader(),
-        dataGroupMember.getRaftGroupId());
-    if (headerGroupMap.containsKey(header)) {
-      logger.debug("group {} already exist.", dataGroupMember.getAllNodes());
-      return;
+  public DataGroupMember addDataGroupMember(DataGroupMember dataGroupMember, 
RaftNode header) {
+    synchronized (headerGroupMap) {
+      if (headerGroupMap.containsKey(header)) {
+        logger.debug("group {} already exist.", dataGroupMember.getAllNodes());
+        return headerGroupMap.get(header);
+      }
+      stoppedMemberManager.remove(header);
+      headerGroupMap.put(header, dataGroupMember);
+      resetServiceCache(header);
+      dataGroupMember.start();
+      return dataGroupMember;
     }
-    stoppedMemberManager.remove(header);
-    headerGroupMap.put(header, dataGroupMember);
-    resetServiceCache(header);
-    dataGroupMember.start();
   }
 
   private void resetServiceCache(RaftNode header) {
@@ -541,9 +542,7 @@ public class DataClusterServer extends RaftServer 
implements TSDataService.Async
    */
   public void addNode(Node node, NodeAdditionResult result) {
     // If the node executed adding itself to the cluster, it's unnecessary to 
add new groups because they already exist.
-    // Just pull snapshot.
     if (node.equals(thisNode)) {
-      pullSnapshots();
       return;
     }
     Iterator<Entry<RaftNode, DataGroupMember>> entryIterator = 
headerGroupMap.entrySet().iterator();
@@ -566,9 +565,10 @@ public class DataClusterServer extends RaftServer 
implements TSDataService.Async
       }
       for (PartitionGroup newGroup : result.getNewGroupList()) {
         if (newGroup.contains(thisNode)) {
+          RaftNode header = new RaftNode(newGroup.getHeader(), 
newGroup.getId());
           logger.info("Adding this node into a new group {}", newGroup);
           DataGroupMember dataGroupMember = dataMemberFactory.create(newGroup, 
thisNode);
-          addDataGroupMember(dataGroupMember);
+          dataGroupMember = addDataGroupMember(dataGroupMember, header);
           dataGroupMember
               .pullNodeAdditionSnapshots(((SlotPartitionTable) 
partitionTable).getNodeSlots(node,
                   newGroup.getId()), node);
@@ -631,13 +631,14 @@ public class DataClusterServer extends RaftServer 
implements TSDataService.Async
 
     List<PartitionGroup> partitionGroups = partitionTable.getLocalGroups();
     for (PartitionGroup partitionGroup : partitionGroups) {
-      DataGroupMember prevMember = headerGroupMap.get(new 
RaftNode(partitionGroup.getHeader(), partitionGroup.getId()));
+      RaftNode header = new RaftNode(partitionGroup.getHeader(), 
partitionGroup.getId());
+      DataGroupMember prevMember = headerGroupMap.get(header);
       if (prevMember == null || 
!prevMember.getAllNodes().equals(partitionGroup)) {
         logger.info("Building member of data group: {}", partitionGroup);
         // no previous member or member changed
         DataGroupMember dataGroupMember = 
dataMemberFactory.create(partitionGroup, thisNode);
         // the previous member will be replaced here
-        addDataGroupMember(dataGroupMember);
+        addDataGroupMember(dataGroupMember, header);
         dataGroupMember.setUnchanged(true);
       } else {
         prevMember.setUnchanged(true);
@@ -698,7 +699,7 @@ public class DataClusterServer extends RaftServer 
implements TSDataService.Async
         if (!headerGroupMap.containsKey(header)) {
           logger.info("{} should join a new group {}", thisNode, group);
           DataGroupMember dataGroupMember = dataMemberFactory.create(group, 
thisNode);
-          addDataGroupMember(dataGroupMember);
+          addDataGroupMember(dataGroupMember, header);
         }
         // pull new slots from the removed node
         headerGroupMap.get(header).pullSlots(removalResult);
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 962c466..8cb12cc 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -19,8 +19,9 @@
 
 package org.apache.iotdb.cluster.server.member;
 
+import static 
org.apache.iotdb.cluster.config.ClusterConstant.THREAD_POLL_WAIT_TERMINATION_TIME;
+
 import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
@@ -202,6 +203,10 @@ public class DataGroupMember extends RaftMember {
     heartBeatService.submit(new DataHeartbeatThread(this));
     pullSnapshotService = 
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
     pullSnapshotHintService = new PullSnapshotHintService(this);
+    logger.info("{}: has inited pullSnapshotService and 
pullSnapshotHintService", name);
+    if (pullSnapshotHintService == null) {
+      logger.error("{}: pullSnapshotHintService is null", name);
+    }
     pullSnapshotHintService.start();
     resumePullSnapshotTasks();
   }
@@ -217,7 +222,7 @@ public class DataGroupMember extends RaftMember {
     if (pullSnapshotService != null) {
       pullSnapshotService.shutdownNow();
       try {
-        pullSnapshotService.awaitTermination(10, TimeUnit.SECONDS);
+        
pullSnapshotService.awaitTermination(THREAD_POLL_WAIT_TERMINATION_TIME, 
TimeUnit.SECONDS);
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         logger.error("Unexpected interruption when waiting for 
pullSnapshotService to end", e);
@@ -234,12 +239,24 @@ public class DataGroupMember extends RaftMember {
     logger.info("{}: stopped", name);
   }
 
-  /**
-   * The first node (on the hash ring) in this data group is the header. It 
determines the duty
-   * (what range on the ring do the group take responsibility for) of the 
group and although other
-   * nodes in this may change, this node is unchangeable unless the data group 
is dismissed. It is
-   * also the identifier of this data group.
-   */
+  @Override
+  long checkElectorLogProgress(ElectionRequest electionRequest) {
+    Node elector = electionRequest.getElector();
+    // check if the node is in the group
+    if (!allNodes.contains(elector)) {
+      logger.info("{}: the elector {} is not in the data group {}, so reject 
this election.", name,
+          getPartitionGroup(), elector);
+      return Response.RESPONSE_NODE_IS_NOT_IN_GROUP;
+    }
+    return super.checkElectorLogProgress(electionRequest);
+  }
+
+    /**
+     * The first node (on the hash ring) in this data group is the header. It 
determines the duty
+     * (what range on the ring do the group take responsibility for) of the 
group and although other
+     * nodes in this may change, this node is unchangeable unless the data 
group is dismissed. It is
+     * also the identifier of this data group.
+     */
   @Override
   public Node getHeader() {
     return allNodes.get(0);
@@ -370,7 +387,9 @@ public class DataGroupMember extends RaftMember {
     }
 
     // Make sure local data is complete.
-    if (lastAppliedPartitionTableVersion.getVersion() != 
metaGroupMember.getPartitionTable().getLastMetaLogIndex()) {
+    if (character != NodeCharacter.LEADER
+        && lastAppliedPartitionTableVersion.getVersion() != 
metaGroupMember.getPartitionTable()
+        .getLastMetaLogIndex()) {
       return null;
     }
 
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 95dbd87..4a33693 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.cluster.server.member;
 
+import static 
org.apache.iotdb.cluster.config.ClusterConstant.THREAD_POLL_WAIT_TERMINATION_TIME;
 import static 
org.apache.iotdb.cluster.utils.ClusterUtils.WAIT_START_UP_CHECK_TIME_SEC;
 import static 
org.apache.iotdb.cluster.utils.ClusterUtils.analyseStartUpCheckResult;
 
@@ -94,6 +95,7 @@ import 
org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
 import org.apache.iotdb.cluster.query.ClusterPlanRouter;
 import org.apache.iotdb.cluster.rpc.thrift.AddNodeResponse;
 import org.apache.iotdb.cluster.rpc.thrift.CheckStatusResponse;
+import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
 import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest;
 import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
@@ -379,7 +381,7 @@ public class MetaGroupMember extends RaftMember {
     if (reportThread != null) {
       reportThread.shutdownNow();
       try {
-        reportThread.awaitTermination(10, TimeUnit.SECONDS);
+        reportThread.awaitTermination(THREAD_POLL_WAIT_TERMINATION_TIME, 
TimeUnit.SECONDS);
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         logger.error("Unexpected interruption when waiting for reportThread to 
end", e);
@@ -388,7 +390,7 @@ public class MetaGroupMember extends RaftMember {
     if (hardLinkCleanerThread != null) {
       hardLinkCleanerThread.shutdownNow();
       try {
-        hardLinkCleanerThread.awaitTermination(10, TimeUnit.SECONDS);
+        
hardLinkCleanerThread.awaitTermination(THREAD_POLL_WAIT_TERMINATION_TIME, 
TimeUnit.SECONDS);
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         logger.error("Unexpected interruption when waiting for hardlinkCleaner 
to end", e);
@@ -638,6 +640,19 @@ public class MetaGroupMember extends RaftMember {
     throw new ConfigInconsistentException();
   }
 
+
+  @Override
+  long checkElectorLogProgress(ElectionRequest electionRequest) {
+    Node elector = electionRequest.getElector();
+    // check if the node is in the group
+    if (partitionTable != null && !allNodes.contains(elector)) {
+      logger.info("{}: the elector {} is not in the data group {}, so reject 
this election.", name,
+          getPartitionGroup(), elector);
+      return Response.RESPONSE_NODE_IS_NOT_IN_GROUP;
+    }
+    return super.checkElectorLogProgress(electionRequest);
+  }
+
   /**
    * Process the heartbeat request from a valid leader. Generate and tell the 
leader the identifier
    * of the node if necessary. If the partition table is missing, use the one 
from the request or
@@ -704,6 +719,8 @@ public class MetaGroupMember extends RaftMember {
     updateNodeList(newTable.getAllNodes());
 
     startSubServers();
+
+    getDataClusterServer().pullSnapshots();
   }
 
   private void updateNodeList(Collection<Node> nodes) {
@@ -1955,7 +1972,7 @@ public class MetaGroupMember extends RaftMember {
         return collectMigrationStatusSync(node);
       }
     } catch (TException | InterruptedException e) {
-      logger.warn("Cannot get the status of all nodes", e);
+      logger.error("{}: Cannot get the status of node {}", name, node, e);
     }
     return null;
   }
@@ -2219,7 +2236,7 @@ public class MetaGroupMember extends RaftMember {
       } else if (thisNode.equals(leader.get())) {
         // as the old node is removed, it cannot know this by heartbeat or 
log, so it should be
         // directly kicked out of the cluster
-        exileNode(removeNodeLog);
+        getAppendLogThreadPool().submit(() -> exileNode(removeNodeLog));
       }
 
       if (logger.isDebugEnabled()) {
@@ -2291,6 +2308,9 @@ public class MetaGroupMember extends RaftMember {
     syncLeader();
     Map<PartitionGroup, Integer> res = new HashMap<>();
     for (Node node: allNodes) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("{}: start to get migration status of {}", name, node);
+      }
       Map<PartitionGroup, Integer> oneNodeRes;
       if (node.equals(thisNode)) {
         oneNodeRes = collectMigrationStatus();
@@ -2312,8 +2332,11 @@ public class MetaGroupMember extends RaftMember {
    * @return key: data group; value: slot num in data migration
    */
   public Map<PartitionGroup, Integer> collectMigrationStatus() {
-    logger.info("{}: start to collect migration status.", name);
+    logger.info("{}: start to collect migration status locally.", name);
     Map<PartitionGroup, Integer> groupSlotMap = new HashMap<>();
+    if (getPartitionTable() == null) {
+      return groupSlotMap;
+    }
     Map<RaftNode, DataGroupMember> headerMap = 
getDataClusterServer().getHeaderGroupMap();
     waitUtil(getPartitionTable().getLastMetaLogIndex());
     synchronized (headerMap) {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 5f5579f..813dbf3 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.cluster.server.member;
 
+import static 
org.apache.iotdb.cluster.config.ClusterConstant.THREAD_POLL_WAIT_TERMINATION_TIME;
+
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.io.IOException;
 import java.net.SocketTimeoutException;
@@ -339,9 +341,9 @@ public abstract class RaftMember {
     catchUpService.shutdownNow();
     appendLogThreadPool.shutdownNow();
     try {
-      heartBeatService.awaitTermination(10, TimeUnit.SECONDS);
-      catchUpService.awaitTermination(10, TimeUnit.SECONDS);
-      appendLogThreadPool.awaitTermination(10, TimeUnit.SECONDS);
+      heartBeatService.awaitTermination(THREAD_POLL_WAIT_TERMINATION_TIME, 
TimeUnit.SECONDS);
+      catchUpService.awaitTermination(THREAD_POLL_WAIT_TERMINATION_TIME, 
TimeUnit.SECONDS);
+      appendLogThreadPool.awaitTermination(THREAD_POLL_WAIT_TERMINATION_TIME, 
TimeUnit.SECONDS);
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       logger.error("Unexpected interruption when waiting for heartBeatService 
and catchUpService "
@@ -350,7 +352,7 @@ public abstract class RaftMember {
     if (serialToParallelPool != null) {
       serialToParallelPool.shutdownNow();
       try {
-        serialToParallelPool.awaitTermination(10, TimeUnit.SECONDS);
+        
serialToParallelPool.awaitTermination(THREAD_POLL_WAIT_TERMINATION_TIME, 
TimeUnit.SECONDS);
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         logger.error("Unexpected interruption when waiting for asyncThreadPool 
to end", e);
@@ -360,7 +362,7 @@ public abstract class RaftMember {
     if (commitLogPool != null) {
       commitLogPool.shutdownNow();
       try {
-        commitLogPool.awaitTermination(10, TimeUnit.SECONDS);
+        commitLogPool.awaitTermination(THREAD_POLL_WAIT_TERMINATION_TIME, 
TimeUnit.SECONDS);
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         logger.error("Unexpected interruption when waiting for commitLogPool 
to end", e);
@@ -1139,12 +1141,6 @@ public abstract class RaftMember {
     long thatLastLogTerm = electionRequest.getLastLogTerm();
     Node elector = electionRequest.getElector();
 
-    // check if the node is in the group
-    if (!allNodes.contains(elector)) {
-      logger.info("{}: the elector {} is not in the group, so reject this 
election.", name, elector);
-      return Response.RESPONSE_NODE_IS_NOT_IN_GROUP;
-    }
-
     // check the log progress of the elector
     long resp = checkLogProgress(thatLastLogIndex, thatLastLogTerm);
     if (resp == Response.RESPONSE_AGREE) {
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
 
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 2eb61bd..4347539 100644
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -515,9 +515,10 @@ public class MetaGroupMemberTest extends MemberTest {
 
     dataClusterServer.setPartitionTable(partitionTable);
     for (PartitionGroup partitionGroup : partitionGroups) {
+      RaftNode header = new RaftNode(partitionGroup.getHeader(), 
partitionGroup.getId());
       DataGroupMember dataGroupMember = getDataGroupMember(partitionGroup, 
TestUtils.getNode(0));
       dataGroupMember.start();
-      dataClusterServer.addDataGroupMember(dataGroupMember);
+      dataClusterServer.addDataGroupMember(dataGroupMember, header);
     }
   }
 

Reply via email to