This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a18c902d82 [IOTDB-3557] Maintain online data nodes by load manager 
(#6575)
a18c902d82 is described below

commit a18c902d82057d41e49de539f811987d3f02ce85
Author: 任宇华 <[email protected]>
AuthorDate: Tue Jul 5 08:39:38 2022 +0800

    [IOTDB-3557] Maintain online data nodes by load manager (#6575)
---
 .../confignode/manager/ClusterSchemaManager.java   |  2 +-
 .../iotdb/confignode/manager/ConfigManager.java    |  4 +-
 .../iotdb/confignode/manager/ConsensusManager.java |  5 +-
 .../iotdb/confignode/manager/NodeManager.java      | 34 +++++------
 .../iotdb/confignode/manager/PartitionManager.java |  2 +-
 .../confignode/manager/PermissionManager.java      |  2 +-
 .../iotdb/confignode/manager/UDFManager.java       | 20 +++----
 .../iotdb/confignode/manager/load/LoadManager.java | 25 ++++++---
 .../manager/load/balancer/RegionBalancer.java      |  8 ++-
 .../iotdb/confignode/persistence/NodeInfo.java     | 65 +++++++++++-----------
 .../procedure/env/ConfigNodeProcedureEnv.java      |  2 +-
 .../iotdb/confignode/persistence/NodeInfoTest.java |  4 +-
 12 files changed, 93 insertions(+), 80 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
index 235377855e..de7af53548 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
@@ -197,7 +197,7 @@ public class ClusterSchemaManager {
     // Get all StorageGroupSchemas
     Map<String, TStorageGroupSchema> storageGroupSchemaMap =
         getMatchedStorageGroupSchemasByName(getStorageGroupNames());
-    int dataNodeNum = getNodeManager().getOnlineDataNodeCount();
+    int dataNodeNum = getNodeManager().getRegisteredDataNodeCount();
     int totalCpuCoreNum = getNodeManager().getTotalCpuCoreCount();
     int storageGroupNum = storageGroupSchemaMap.size();
 
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 6ffac63e68..177dd9765f 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -210,7 +210,7 @@ public class ConfigManager implements IManager {
     if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       List<TConfigNodeLocation> configNodeLocations = 
getNodeManager().getRegisteredConfigNodes();
       List<TDataNodeLocation> dataNodeInfoLocations =
-          getNodeManager().getOnlineDataNodes(-1).stream()
+          getNodeManager().getRegisteredDataNodes(-1).stream()
               .map(TDataNodeInfo::getLocation)
               .collect(Collectors.toList());
       Map<Integer, String> nodeStatus = new HashMap<>();
@@ -783,7 +783,7 @@ public class ConfigManager implements IManager {
       dataNodeInfosResp.setStatus(status);
       return dataNodeInfosResp;
     }
-    List<TDataNodesInfo> dataNodesInfoList = 
nodeManager.getOnlineDataNodesInfoList();
+    List<TDataNodesInfo> dataNodesInfoList = 
nodeManager.getRegisteredDataNodesInfoList();
     RegionInfoListResp regionsInfoDataSet =
         (RegionInfoListResp) 
partitionManager.getRegionInfoList(getRegionsinfoReq);
 
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
index 1667e368d8..fd5401cd83 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
@@ -184,9 +184,10 @@ public class ConsensusManager {
     for (int retry = 0; retry < 50; retry++) {
       Peer leaderPeer = consensusImpl.getLeader(consensusGroupId);
       if (leaderPeer != null) {
-        List<TConfigNodeLocation> onlineConfigNodes = 
getNodeManager().getRegisteredConfigNodes();
+        List<TConfigNodeLocation> registeredConfigNodes =
+            getNodeManager().getRegisteredConfigNodes();
         TConfigNodeLocation leaderLocation =
-            onlineConfigNodes.stream()
+            registeredConfigNodes.stream()
                 .filter(leader -> 
leader.getConsensusEndPoint().equals(leaderPeer.getEndpoint()))
                 .findFirst()
                 .orElse(null);
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
index 84dcf4832d..a76089b205 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
@@ -103,7 +103,7 @@ public class NodeManager {
     DataNodeConfigurationResp dataSet = new DataNodeConfigurationResp();
     TSStatus status = new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     status.setMessage("registerDataNode success.");
-    if (nodeInfo.isOnlineDataNode(req.getInfo().getLocation())) {
+    if (nodeInfo.isRegisteredDataNode(req.getInfo().getLocation())) {
       status.setCode(TSStatusCode.DATANODE_ALREADY_REGISTERED.getStatusCode());
       status.setMessage("DataNode already registered.");
     } else if (req.getInfo().getLocation().getDataNodeId() < 0) {
@@ -127,7 +127,7 @@ public class NodeManager {
    */
   public TSStatus activateDataNode(ActivateDataNodePlan req) {
     TSStatus status = new TSStatus();
-    if (nodeInfo.isOnlineDataNode(req.getInfo().getLocation())) {
+    if (nodeInfo.isRegisteredDataNode(req.getInfo().getLocation())) {
       status.setCode(TSStatusCode.DATANODE_ALREADY_ACTIVATED.getStatusCode());
       status.setMessage("DataNode already activated.");
     } else {
@@ -152,10 +152,10 @@ public class NodeManager {
   /**
    * Only leader use this interface
    *
-   * @return The number of online DataNodes
+   * @return The number of registered DataNodes
    */
-  public int getOnlineDataNodeCount() {
-    return nodeInfo.getOnlineDataNodeCount();
+  public int getRegisteredDataNodeCount() {
+    return nodeInfo.getRegisteredDataNodeCount();
   }
 
   /**
@@ -171,18 +171,18 @@ public class NodeManager {
    * Only leader use this interface
    *
    * @param dataNodeId Specific DataNodeId
-   * @return All online DataNodes if dataNodeId equals -1. And return the 
specific DataNode
+   * @return All registered DataNodes if dataNodeId equals -1. And return the 
specific DataNode
    *     otherwise.
    */
-  public List<TDataNodeInfo> getOnlineDataNodes(int dataNodeId) {
-    return nodeInfo.getOnlineDataNodes(dataNodeId);
+  public List<TDataNodeInfo> getRegisteredDataNodes(int dataNodeId) {
+    return nodeInfo.getRegisteredDataNodes(dataNodeId);
   }
 
-  public List<TDataNodesInfo> getOnlineDataNodesInfoList() {
+  public List<TDataNodesInfo> getRegisteredDataNodesInfoList() {
     List<TDataNodesInfo> dataNodesLocations = new ArrayList<>();
-    List<TDataNodeInfo> onlineDataNodes = this.getOnlineDataNodes(-1);
-    if (onlineDataNodes != null) {
-      onlineDataNodes.forEach(
+    List<TDataNodeInfo> registeredDataNodes = this.getRegisteredDataNodes(-1);
+    if (registeredDataNodes != null) {
+      registeredDataNodes.forEach(
           (dataNodeInfo) -> {
             TDataNodesInfo tDataNodesLocation = new TDataNodesInfo();
             
tDataNodesLocation.setDataNodeId(dataNodeInfo.getLocation().getDataNodeId());
@@ -386,12 +386,12 @@ public class NodeManager {
   }
 
   public List<TSStatus> flush(TFlushReq req) {
-    List<TDataNodeInfo> onlineDataNodes =
-        configManager.getNodeManager().getOnlineDataNodes(req.dataNodeId);
+    List<TDataNodeInfo> registeredDataNodes =
+        configManager.getNodeManager().getRegisteredDataNodes(req.dataNodeId);
     List<TSStatus> dataNodeResponseStatus =
-        Collections.synchronizedList(new ArrayList<>(onlineDataNodes.size()));
-    CountDownLatch countDownLatch = new CountDownLatch(onlineDataNodes.size());
-    for (TDataNodeInfo dataNodeInfo : onlineDataNodes) {
+        Collections.synchronizedList(new 
ArrayList<>(registeredDataNodes.size()));
+    CountDownLatch countDownLatch = new 
CountDownLatch(registeredDataNodes.size());
+    for (TDataNodeInfo dataNodeInfo : registeredDataNodes) {
       AsyncDataNodeClientPool.getInstance()
           .flush(
               dataNodeInfo.getLocation().getInternalEndPoint(),
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
index 2940ebfe14..557d9527bc 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
@@ -298,7 +298,7 @@ public class PartitionManager {
         unreadyStorageGroupMap.put(storageGroup, 1);
       }
     }
-    if (getNodeManager().getOnlineDataNodeCount() < leastDataNode) {
+    if (getNodeManager().getRegisteredDataNodeCount() < leastDataNode) {
       // Make sure DataNodes enough
       throw new NotEnoughDataNodeException();
     }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
index b90118b35a..ad3e361645 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
@@ -98,7 +98,7 @@ public class PermissionManager {
    * permissions related to the user or role
    */
   public TSStatus invalidateCache(String username, String roleName) {
-    List<TDataNodeInfo> allDataNodes = 
configManager.getNodeManager().getOnlineDataNodes(-1);
+    List<TDataNodeInfo> allDataNodes = 
configManager.getNodeManager().getRegisteredDataNodes(-1);
     TInvalidatePermissionCacheReq req = new TInvalidatePermissionCacheReq();
     TSStatus status;
     req.setUsername(username);
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
index 18c4098c00..22b8f112d8 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
@@ -80,15 +80,15 @@ public class UDFManager {
 
   private List<TSStatus> createFunctionOnDataNodes(
       String functionName, String className, List<String> uris) {
-    final List<TDataNodeInfo> onlineDataNodes =
-        configManager.getNodeManager().getOnlineDataNodes(-1);
+    final List<TDataNodeInfo> registeredDataNodes =
+        configManager.getNodeManager().getRegisteredDataNodes(-1);
     final List<TSStatus> dataNodeResponseStatus =
-        Collections.synchronizedList(new ArrayList<>(onlineDataNodes.size()));
-    final CountDownLatch countDownLatch = new 
CountDownLatch(onlineDataNodes.size());
+        Collections.synchronizedList(new 
ArrayList<>(registeredDataNodes.size()));
+    final CountDownLatch countDownLatch = new 
CountDownLatch(registeredDataNodes.size());
     final TCreateFunctionRequest request =
         new TCreateFunctionRequest(functionName, className, uris);
 
-    for (TDataNodeInfo dataNodeInfo : onlineDataNodes) {
+    for (TDataNodeInfo dataNodeInfo : registeredDataNodes) {
       final TEndPoint endPoint = 
dataNodeInfo.getLocation().getInternalEndPoint();
       AsyncDataNodeClientPool.getInstance()
           .createFunction(
@@ -125,14 +125,14 @@ public class UDFManager {
   }
 
   private List<TSStatus> dropFunctionOnDataNodes(String functionName) {
-    final List<TDataNodeInfo> onlineDataNodes =
-        configManager.getNodeManager().getOnlineDataNodes(-1);
+    final List<TDataNodeInfo> registeredDataNodes =
+        configManager.getNodeManager().getRegisteredDataNodes(-1);
     final List<TSStatus> dataNodeResponseStatus =
-        Collections.synchronizedList(new ArrayList<>(onlineDataNodes.size()));
-    final CountDownLatch countDownLatch = new 
CountDownLatch(onlineDataNodes.size());
+        Collections.synchronizedList(new 
ArrayList<>(registeredDataNodes.size()));
+    final CountDownLatch countDownLatch = new 
CountDownLatch(registeredDataNodes.size());
     final TDropFunctionRequest request = new 
TDropFunctionRequest(functionName);
 
-    for (TDataNodeInfo dataNodeInfo : onlineDataNodes) {
+    for (TDataNodeInfo dataNodeInfo : registeredDataNodes) {
       final TEndPoint endPoint = 
dataNodeInfo.getLocation().getInternalEndPoint();
       AsyncDataNodeClientPool.getInstance()
           .dropFunction(
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index d01a26fc97..b5ea29f5b0 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -248,8 +248,8 @@ public class LoadManager {
   /** loop body of the heartbeat thread */
   private void heartbeatLoopBody() {
     if (getConsensusManager().isLeader()) {
-      // Send heartbeat requests to all the online DataNodes
-      pingOnlineDataNodes(getNodeManager().getOnlineDataNodes(-1));
+      // Send heartbeat requests to all the registered DataNodes
+      pingRegisteredDataNodes(getNodeManager().getRegisteredDataNodes(-1));
       // Send heartbeat requests to all the registered ConfigNodes
       pingRegisteredConfigNodes(getNodeManager().getRegisteredConfigNodes());
     }
@@ -274,13 +274,13 @@ public class LoadManager {
   }
 
   /**
-   * Send heartbeat requests to all the online DataNodes
+   * Send heartbeat requests to all the Registered DataNodes
    *
-   * @param onlineDataNodes DataNodes that currently online
+   * @param registeredDataNodes DataNodes that registered in cluster
    */
-  private void pingOnlineDataNodes(List<TDataNodeInfo> onlineDataNodes) {
+  private void pingRegisteredDataNodes(List<TDataNodeInfo> 
registeredDataNodes) {
     // Send heartbeat requests
-    for (TDataNodeInfo dataNodeInfo : onlineDataNodes) {
+    for (TDataNodeInfo dataNodeInfo : registeredDataNodes) {
       DataNodeHeartbeatHandler handler =
           new DataNodeHeartbeatHandler(
               dataNodeInfo.getLocation(),
@@ -296,7 +296,7 @@ public class LoadManager {
   }
 
   /**
-   * Send heartbeat requests to all the online ConfigNodes
+   * Send heartbeat requests to all the Registered ConfigNodes
    *
    * @param registeredConfigNodes ConfigNodes that registered in cluster
    */
@@ -338,6 +338,17 @@ public class LoadManager {
         .collect(Collectors.toList());
   }
 
+  public List<TDataNodeInfo> getOnlineDataNodes(int dataNodeId) {
+    return getNodeManager().getRegisteredDataNodes(dataNodeId).stream()
+        .filter(
+            registeredDataNode ->
+                heartbeatCacheMap
+                    .get(registeredDataNode.getLocation().getDataNodeId())
+                    .getNodeStatus()
+                    .equals(NodeStatus.Running))
+        .collect(Collectors.toList());
+  }
+
   private ConsensusManager getConsensusManager() {
     return configManager.getConsensusManager();
   }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
index 421a726c9a..c028f61845 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
@@ -62,7 +62,9 @@ public class RegionBalancer {
     CreateRegionGroupsPlan createRegionGroupsPlan = new 
CreateRegionGroupsPlan();
     IRegionAllocator regionAllocator = genRegionAllocator();
 
-    List<TDataNodeInfo> onlineDataNodes = 
getNodeManager().getOnlineDataNodes(-1);
+    // TODO: After waiting for the IT framework to complete, change the 
following code to:
+    //  List<TDataNodeInfo> onlineDataNodes = 
getLoadManager().getOnlineDataNodes(-1);
+    List<TDataNodeInfo> registeredDataNodes = 
getNodeManager().getRegisteredDataNodes(-1);
     List<TRegionReplicaSet> allocatedRegions = 
getPartitionManager().getAllReplicaSets();
 
     for (Map.Entry<String, Integer> entry : allotmentMap.entrySet()) {
@@ -78,7 +80,7 @@ public class RegionBalancer {
               : storageGroupSchema.getDataReplicationFactor();
 
       // Check validity
-      if (onlineDataNodes.size() < replicationFactor) {
+      if (registeredDataNodes.size() < replicationFactor) {
         throw new NotEnoughDataNodeException();
       }
 
@@ -86,7 +88,7 @@ public class RegionBalancer {
         // Generate allocation plan
         TRegionReplicaSet newRegion =
             regionAllocator.allocateRegion(
-                onlineDataNodes,
+                registeredDataNodes,
                 allocatedRegions,
                 replicationFactor,
                 new TConsensusGroupId(
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
index a4769da54e..e6040b86ca 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
@@ -84,10 +84,10 @@ public class NodeInfo implements SnapshotProcessor {
   private final ReentrantReadWriteLock configNodeInfoReadWriteLock;
   private final Set<TConfigNodeLocation> registeredConfigNodes;
 
-  // Online DataNodes
+  // Registered DataNodes
   private final ReentrantReadWriteLock dataNodeInfoReadWriteLock;
   private final AtomicInteger nextNodeId = new AtomicInteger(0);
-  private final ConcurrentNavigableMap<Integer, TDataNodeInfo> onlineDataNodes 
=
+  private final ConcurrentNavigableMap<Integer, TDataNodeInfo> 
registeredDataNodes =
       new ConcurrentSkipListMap<>();
 
   // For remove or draining DataNode
@@ -110,7 +110,7 @@ public class NodeInfo implements SnapshotProcessor {
               Metric.CONFIG_NODE.toString(),
               MetricLevel.CORE,
               registeredConfigNodes,
-              o -> getOnlineDataNodeCount(),
+              o -> getRegisteredDataNodeCount(),
               Tag.NAME.toString(),
               "online");
       MetricsService.getInstance()
@@ -118,26 +118,26 @@ public class NodeInfo implements SnapshotProcessor {
           .getOrCreateAutoGauge(
               Metric.DATA_NODE.toString(),
               MetricLevel.CORE,
-              onlineDataNodes,
+              registeredDataNodes,
               Map::size,
               Tag.NAME.toString(),
               "online");
     }
   }
 
-  /** @return true if the specific DataNode is now online */
-  public boolean isOnlineDataNode(TDataNodeLocation info) {
+  /** @return true if the specific DataNode is registered */
+  public boolean isRegisteredDataNode(TDataNodeLocation dataNodeLocation) {
     boolean result = false;
     dataNodeInfoReadWriteLock.readLock().lock();
-    int originalDataNodeId = info.getDataNodeId();
+    int originalDataNodeId = dataNodeLocation.getDataNodeId();
     try {
-      for (Map.Entry<Integer, TDataNodeInfo> entry : 
onlineDataNodes.entrySet()) {
-        info.setDataNodeId(entry.getKey());
-        if (entry.getValue().getLocation().equals(info)) {
+      for (Map.Entry<Integer, TDataNodeInfo> entry : 
registeredDataNodes.entrySet()) {
+        dataNodeLocation.setDataNodeId(entry.getKey());
+        if (entry.getValue().getLocation().equals(dataNodeLocation)) {
           result = true;
           break;
         }
-        info.setDataNodeId(originalDataNodeId);
+        dataNodeLocation.setDataNodeId(originalDataNodeId);
       }
     } finally {
       dataNodeInfoReadWriteLock.readLock().unlock();
@@ -194,7 +194,7 @@ public class NodeInfo implements SnapshotProcessor {
     TDataNodeInfo info = activateDataNodePlan.getInfo();
     dataNodeInfoReadWriteLock.writeLock().lock();
     try {
-      onlineDataNodes.put(info.getLocation().getDataNodeId(), info);
+      registeredDataNodes.put(info.getLocation().getDataNodeId(), info);
     } finally {
       dataNodeInfoReadWriteLock.writeLock().unlock();
     }
@@ -216,12 +216,12 @@ public class NodeInfo implements SnapshotProcessor {
     dataNodeInfoReadWriteLock.readLock().lock();
     try {
       if (dataNodeId == -1) {
-        result.setDataNodeInfoMap(new HashMap<>(onlineDataNodes));
+        result.setDataNodeInfoMap(new HashMap<>(registeredDataNodes));
       } else {
         result.setDataNodeInfoMap(
-            onlineDataNodes.get(dataNodeId) == null
+            registeredDataNodes.get(dataNodeId) == null
                 ? new HashMap<>(0)
-                : Collections.singletonMap(dataNodeId, 
onlineDataNodes.get(dataNodeId)));
+                : Collections.singletonMap(dataNodeId, 
registeredDataNodes.get(dataNodeId)));
       }
     } finally {
       dataNodeInfoReadWriteLock.readLock().unlock();
@@ -230,12 +230,12 @@ public class NodeInfo implements SnapshotProcessor {
     return result;
   }
 
-  /** Return the number of online DataNodes */
-  public int getOnlineDataNodeCount() {
+  /** Return the number of registered DataNodes */
+  public int getRegisteredDataNodeCount() {
     int result;
     dataNodeInfoReadWriteLock.readLock().lock();
     try {
-      result = onlineDataNodes.size();
+      result = registeredDataNodes.size();
     } finally {
       dataNodeInfoReadWriteLock.readLock().unlock();
     }
@@ -247,7 +247,7 @@ public class NodeInfo implements SnapshotProcessor {
     int result = 0;
     dataNodeInfoReadWriteLock.readLock().lock();
     try {
-      for (TDataNodeInfo info : onlineDataNodes.values()) {
+      for (TDataNodeInfo info : registeredDataNodes.values()) {
         result += info.getCpuCoreNum();
       }
     } finally {
@@ -257,21 +257,20 @@ public class NodeInfo implements SnapshotProcessor {
   }
 
   /**
-   * Return the specific online DataNode
+   * Return the specific registered DataNode
    *
    * @param dataNodeId Specific DataNodeId
-   * @return All online DataNodes if dataNodeId equals -1. And return the 
specific DataNode
+   * @return All registered DataNodes if dataNodeId equals -1. And return the 
specific DataNode
    *     otherwise.
    */
-  public List<TDataNodeInfo> getOnlineDataNodes(int dataNodeId) {
+  public List<TDataNodeInfo> getRegisteredDataNodes(int dataNodeId) {
     List<TDataNodeInfo> result;
     dataNodeInfoReadWriteLock.readLock().lock();
     try {
-      // TODO: Check DataNode status, ensure the returned DataNode isn't 
removed
       if (dataNodeId == -1) {
-        result = new ArrayList<>(onlineDataNodes.values());
+        result = new ArrayList<>(registeredDataNodes.values());
       } else {
-        result = Collections.singletonList(onlineDataNodes.get(dataNodeId));
+        result = 
Collections.singletonList(registeredDataNodes.get(dataNodeId));
       }
     } finally {
       dataNodeInfoReadWriteLock.readLock().unlock();
@@ -379,7 +378,7 @@ public class NodeInfo implements SnapshotProcessor {
 
       ReadWriteIOUtils.write(nextNodeId.get(), fileOutputStream);
 
-      serializeOnlineDataNode(fileOutputStream, protocol);
+      serializeRegisteredDataNode(fileOutputStream, protocol);
 
       serializeDrainingDataNodes(fileOutputStream, protocol);
 
@@ -403,10 +402,10 @@ public class NodeInfo implements SnapshotProcessor {
     }
   }
 
-  private void serializeOnlineDataNode(OutputStream outputStream, TProtocol 
protocol)
+  private void serializeRegisteredDataNode(OutputStream outputStream, 
TProtocol protocol)
       throws IOException, TException {
-    ReadWriteIOUtils.write(onlineDataNodes.size(), outputStream);
-    for (Entry<Integer, TDataNodeInfo> entry : onlineDataNodes.entrySet()) {
+    ReadWriteIOUtils.write(registeredDataNodes.size(), outputStream);
+    for (Entry<Integer, TDataNodeInfo> entry : registeredDataNodes.entrySet()) 
{
       ReadWriteIOUtils.write(entry.getKey(), outputStream);
       entry.getValue().write(protocol);
     }
@@ -442,7 +441,7 @@ public class NodeInfo implements SnapshotProcessor {
 
       nextNodeId.set(ReadWriteIOUtils.readInt(fileInputStream));
 
-      deserializeOnlineDataNode(fileInputStream, protocol);
+      deserializeRegisteredDataNode(fileInputStream, protocol);
 
       deserializeDrainingDataNodes(fileInputStream, protocol);
 
@@ -452,14 +451,14 @@ public class NodeInfo implements SnapshotProcessor {
     }
   }
 
-  private void deserializeOnlineDataNode(InputStream inputStream, TProtocol 
protocol)
+  private void deserializeRegisteredDataNode(InputStream inputStream, 
TProtocol protocol)
       throws IOException, TException {
     int size = ReadWriteIOUtils.readInt(inputStream);
     while (size > 0) {
       int dataNodeId = ReadWriteIOUtils.readInt(inputStream);
       TDataNodeInfo dataNodeInfo = new TDataNodeInfo();
       dataNodeInfo.read(protocol);
-      onlineDataNodes.put(dataNodeId, dataNodeInfo);
+      registeredDataNodes.put(dataNodeId, dataNodeInfo);
       size--;
     }
   }
@@ -493,7 +492,7 @@ public class NodeInfo implements SnapshotProcessor {
 
   public void clear() {
     nextNodeId.set(0);
-    onlineDataNodes.clear();
+    registeredDataNodes.clear();
     drainingDataNodes.clear();
     registeredConfigNodes.clear();
   }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index e2e5f87cae..82981f54d6 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -106,7 +106,7 @@ public class ConfigNodeProcedureEnv {
     if (skipForTest) {
       return invalidCacheResult;
     }
-    List<TDataNodeInfo> allDataNodes = 
configManager.getNodeManager().getOnlineDataNodes(-1);
+    List<TDataNodeInfo> allDataNodes = 
configManager.getNodeManager().getRegisteredDataNodes(-1);
     TInvalidateCacheReq invalidateCacheReq = new TInvalidateCacheReq();
     invalidateCacheReq.setStorageGroup(true);
     invalidateCacheReq.setFullPath(storageGroupName);
diff --git 
a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java
 
b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java
index ba79e6d101..20976cba96 100644
--- 
a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java
+++ 
b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java
@@ -78,7 +78,7 @@ public class NodeInfoTest {
     nodeInfo.setDrainingDataNodes(drainingDataNodes_before);
 
     int nextId = nodeInfo.getNextNodeId();
-    List<TDataNodeInfo> onlineDataNodes_before = 
nodeInfo.getOnlineDataNodes(-1);
+    List<TDataNodeInfo> onlineDataNodes_before = 
nodeInfo.getRegisteredDataNodes(-1);
 
     nodeInfo.processTakeSnapshot(snapshotDir);
     nodeInfo.clear();
@@ -89,7 +89,7 @@ public class NodeInfoTest {
     Set<TDataNodeLocation> drainingDataNodes_after = 
nodeInfo.getDrainingDataNodes();
     Assert.assertEquals(drainingDataNodes_before, drainingDataNodes_after);
 
-    List<TDataNodeInfo> onlineDataNodes_after = 
nodeInfo.getOnlineDataNodes(-1);
+    List<TDataNodeInfo> onlineDataNodes_after = 
nodeInfo.getRegisteredDataNodes(-1);
     Assert.assertEquals(onlineDataNodes_before, onlineDataNodes_after);
   }
 

Reply via email to