This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a18c902d82 [IOTDB-3557] Maintain online data nodes by load manager
(#6575)
a18c902d82 is described below
commit a18c902d82057d41e49de539f811987d3f02ce85
Author: 任宇华 <[email protected]>
AuthorDate: Tue Jul 5 08:39:38 2022 +0800
[IOTDB-3557] Maintain online data nodes by load manager (#6575)
---
.../confignode/manager/ClusterSchemaManager.java | 2 +-
.../iotdb/confignode/manager/ConfigManager.java | 4 +-
.../iotdb/confignode/manager/ConsensusManager.java | 5 +-
.../iotdb/confignode/manager/NodeManager.java | 34 +++++------
.../iotdb/confignode/manager/PartitionManager.java | 2 +-
.../confignode/manager/PermissionManager.java | 2 +-
.../iotdb/confignode/manager/UDFManager.java | 20 +++----
.../iotdb/confignode/manager/load/LoadManager.java | 25 ++++++---
.../manager/load/balancer/RegionBalancer.java | 8 ++-
.../iotdb/confignode/persistence/NodeInfo.java | 65 +++++++++++-----------
.../procedure/env/ConfigNodeProcedureEnv.java | 2 +-
.../iotdb/confignode/persistence/NodeInfoTest.java | 4 +-
12 files changed, 93 insertions(+), 80 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
index 235377855e..de7af53548 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
@@ -197,7 +197,7 @@ public class ClusterSchemaManager {
// Get all StorageGroupSchemas
Map<String, TStorageGroupSchema> storageGroupSchemaMap =
getMatchedStorageGroupSchemasByName(getStorageGroupNames());
- int dataNodeNum = getNodeManager().getOnlineDataNodeCount();
+ int dataNodeNum = getNodeManager().getRegisteredDataNodeCount();
int totalCpuCoreNum = getNodeManager().getTotalCpuCoreCount();
int storageGroupNum = storageGroupSchemaMap.size();
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 6ffac63e68..177dd9765f 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -210,7 +210,7 @@ public class ConfigManager implements IManager {
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
List<TConfigNodeLocation> configNodeLocations =
getNodeManager().getRegisteredConfigNodes();
List<TDataNodeLocation> dataNodeInfoLocations =
- getNodeManager().getOnlineDataNodes(-1).stream()
+ getNodeManager().getRegisteredDataNodes(-1).stream()
.map(TDataNodeInfo::getLocation)
.collect(Collectors.toList());
Map<Integer, String> nodeStatus = new HashMap<>();
@@ -783,7 +783,7 @@ public class ConfigManager implements IManager {
dataNodeInfosResp.setStatus(status);
return dataNodeInfosResp;
}
- List<TDataNodesInfo> dataNodesInfoList =
nodeManager.getOnlineDataNodesInfoList();
+ List<TDataNodesInfo> dataNodesInfoList =
nodeManager.getRegisteredDataNodesInfoList();
RegionInfoListResp regionsInfoDataSet =
(RegionInfoListResp)
partitionManager.getRegionInfoList(getRegionsinfoReq);
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
index 1667e368d8..fd5401cd83 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
@@ -184,9 +184,10 @@ public class ConsensusManager {
for (int retry = 0; retry < 50; retry++) {
Peer leaderPeer = consensusImpl.getLeader(consensusGroupId);
if (leaderPeer != null) {
- List<TConfigNodeLocation> onlineConfigNodes =
getNodeManager().getRegisteredConfigNodes();
+ List<TConfigNodeLocation> registeredConfigNodes =
+ getNodeManager().getRegisteredConfigNodes();
TConfigNodeLocation leaderLocation =
- onlineConfigNodes.stream()
+ registeredConfigNodes.stream()
.filter(leader ->
leader.getConsensusEndPoint().equals(leaderPeer.getEndpoint()))
.findFirst()
.orElse(null);
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
index 84dcf4832d..a76089b205 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
@@ -103,7 +103,7 @@ public class NodeManager {
DataNodeConfigurationResp dataSet = new DataNodeConfigurationResp();
TSStatus status = new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
status.setMessage("registerDataNode success.");
- if (nodeInfo.isOnlineDataNode(req.getInfo().getLocation())) {
+ if (nodeInfo.isRegisteredDataNode(req.getInfo().getLocation())) {
status.setCode(TSStatusCode.DATANODE_ALREADY_REGISTERED.getStatusCode());
status.setMessage("DataNode already registered.");
} else if (req.getInfo().getLocation().getDataNodeId() < 0) {
@@ -127,7 +127,7 @@ public class NodeManager {
*/
public TSStatus activateDataNode(ActivateDataNodePlan req) {
TSStatus status = new TSStatus();
- if (nodeInfo.isOnlineDataNode(req.getInfo().getLocation())) {
+ if (nodeInfo.isRegisteredDataNode(req.getInfo().getLocation())) {
status.setCode(TSStatusCode.DATANODE_ALREADY_ACTIVATED.getStatusCode());
status.setMessage("DataNode already activated.");
} else {
@@ -152,10 +152,10 @@ public class NodeManager {
/**
* Only leader use this interface
*
- * @return The number of online DataNodes
+ * @return The number of registered DataNodes
*/
- public int getOnlineDataNodeCount() {
- return nodeInfo.getOnlineDataNodeCount();
+ public int getRegisteredDataNodeCount() {
+ return nodeInfo.getRegisteredDataNodeCount();
}
/**
@@ -171,18 +171,18 @@ public class NodeManager {
* Only leader use this interface
*
* @param dataNodeId Specific DataNodeId
- * @return All online DataNodes if dataNodeId equals -1. And return the
specific DataNode
+ * @return All registered DataNodes if dataNodeId equals -1. And return the
specific DataNode
* otherwise.
*/
- public List<TDataNodeInfo> getOnlineDataNodes(int dataNodeId) {
- return nodeInfo.getOnlineDataNodes(dataNodeId);
+ public List<TDataNodeInfo> getRegisteredDataNodes(int dataNodeId) {
+ return nodeInfo.getRegisteredDataNodes(dataNodeId);
}
- public List<TDataNodesInfo> getOnlineDataNodesInfoList() {
+ public List<TDataNodesInfo> getRegisteredDataNodesInfoList() {
List<TDataNodesInfo> dataNodesLocations = new ArrayList<>();
- List<TDataNodeInfo> onlineDataNodes = this.getOnlineDataNodes(-1);
- if (onlineDataNodes != null) {
- onlineDataNodes.forEach(
+ List<TDataNodeInfo> registeredDataNodes = this.getRegisteredDataNodes(-1);
+ if (registeredDataNodes != null) {
+ registeredDataNodes.forEach(
(dataNodeInfo) -> {
TDataNodesInfo tDataNodesLocation = new TDataNodesInfo();
tDataNodesLocation.setDataNodeId(dataNodeInfo.getLocation().getDataNodeId());
@@ -386,12 +386,12 @@ public class NodeManager {
}
public List<TSStatus> flush(TFlushReq req) {
- List<TDataNodeInfo> onlineDataNodes =
- configManager.getNodeManager().getOnlineDataNodes(req.dataNodeId);
+ List<TDataNodeInfo> registeredDataNodes =
+ configManager.getNodeManager().getRegisteredDataNodes(req.dataNodeId);
List<TSStatus> dataNodeResponseStatus =
- Collections.synchronizedList(new ArrayList<>(onlineDataNodes.size()));
- CountDownLatch countDownLatch = new CountDownLatch(onlineDataNodes.size());
- for (TDataNodeInfo dataNodeInfo : onlineDataNodes) {
+ Collections.synchronizedList(new
ArrayList<>(registeredDataNodes.size()));
+ CountDownLatch countDownLatch = new
CountDownLatch(registeredDataNodes.size());
+ for (TDataNodeInfo dataNodeInfo : registeredDataNodes) {
AsyncDataNodeClientPool.getInstance()
.flush(
dataNodeInfo.getLocation().getInternalEndPoint(),
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
index 2940ebfe14..557d9527bc 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
@@ -298,7 +298,7 @@ public class PartitionManager {
unreadyStorageGroupMap.put(storageGroup, 1);
}
}
- if (getNodeManager().getOnlineDataNodeCount() < leastDataNode) {
+ if (getNodeManager().getRegisteredDataNodeCount() < leastDataNode) {
// Make sure DataNodes enough
throw new NotEnoughDataNodeException();
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
index b90118b35a..ad3e361645 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
@@ -98,7 +98,7 @@ public class PermissionManager {
* permissions related to the user or role
*/
public TSStatus invalidateCache(String username, String roleName) {
- List<TDataNodeInfo> allDataNodes =
configManager.getNodeManager().getOnlineDataNodes(-1);
+ List<TDataNodeInfo> allDataNodes =
configManager.getNodeManager().getRegisteredDataNodes(-1);
TInvalidatePermissionCacheReq req = new TInvalidatePermissionCacheReq();
TSStatus status;
req.setUsername(username);
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
index 18c4098c00..22b8f112d8 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
@@ -80,15 +80,15 @@ public class UDFManager {
private List<TSStatus> createFunctionOnDataNodes(
String functionName, String className, List<String> uris) {
- final List<TDataNodeInfo> onlineDataNodes =
- configManager.getNodeManager().getOnlineDataNodes(-1);
+ final List<TDataNodeInfo> registeredDataNodes =
+ configManager.getNodeManager().getRegisteredDataNodes(-1);
final List<TSStatus> dataNodeResponseStatus =
- Collections.synchronizedList(new ArrayList<>(onlineDataNodes.size()));
- final CountDownLatch countDownLatch = new
CountDownLatch(onlineDataNodes.size());
+ Collections.synchronizedList(new
ArrayList<>(registeredDataNodes.size()));
+ final CountDownLatch countDownLatch = new
CountDownLatch(registeredDataNodes.size());
final TCreateFunctionRequest request =
new TCreateFunctionRequest(functionName, className, uris);
- for (TDataNodeInfo dataNodeInfo : onlineDataNodes) {
+ for (TDataNodeInfo dataNodeInfo : registeredDataNodes) {
final TEndPoint endPoint =
dataNodeInfo.getLocation().getInternalEndPoint();
AsyncDataNodeClientPool.getInstance()
.createFunction(
@@ -125,14 +125,14 @@ public class UDFManager {
}
private List<TSStatus> dropFunctionOnDataNodes(String functionName) {
- final List<TDataNodeInfo> onlineDataNodes =
- configManager.getNodeManager().getOnlineDataNodes(-1);
+ final List<TDataNodeInfo> registeredDataNodes =
+ configManager.getNodeManager().getRegisteredDataNodes(-1);
final List<TSStatus> dataNodeResponseStatus =
- Collections.synchronizedList(new ArrayList<>(onlineDataNodes.size()));
- final CountDownLatch countDownLatch = new
CountDownLatch(onlineDataNodes.size());
+ Collections.synchronizedList(new
ArrayList<>(registeredDataNodes.size()));
+ final CountDownLatch countDownLatch = new
CountDownLatch(registeredDataNodes.size());
final TDropFunctionRequest request = new
TDropFunctionRequest(functionName);
- for (TDataNodeInfo dataNodeInfo : onlineDataNodes) {
+ for (TDataNodeInfo dataNodeInfo : registeredDataNodes) {
final TEndPoint endPoint =
dataNodeInfo.getLocation().getInternalEndPoint();
AsyncDataNodeClientPool.getInstance()
.dropFunction(
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index d01a26fc97..b5ea29f5b0 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -248,8 +248,8 @@ public class LoadManager {
/** loop body of the heartbeat thread */
private void heartbeatLoopBody() {
if (getConsensusManager().isLeader()) {
- // Send heartbeat requests to all the online DataNodes
- pingOnlineDataNodes(getNodeManager().getOnlineDataNodes(-1));
+ // Send heartbeat requests to all the registered DataNodes
+ pingRegisteredDataNodes(getNodeManager().getRegisteredDataNodes(-1));
// Send heartbeat requests to all the registered ConfigNodes
pingRegisteredConfigNodes(getNodeManager().getRegisteredConfigNodes());
}
@@ -274,13 +274,13 @@ public class LoadManager {
}
/**
- * Send heartbeat requests to all the online DataNodes
+ * Send heartbeat requests to all the Registered DataNodes
*
- * @param onlineDataNodes DataNodes that currently online
+ * @param registeredDataNodes DataNodes that registered in cluster
*/
- private void pingOnlineDataNodes(List<TDataNodeInfo> onlineDataNodes) {
+ private void pingRegisteredDataNodes(List<TDataNodeInfo>
registeredDataNodes) {
// Send heartbeat requests
- for (TDataNodeInfo dataNodeInfo : onlineDataNodes) {
+ for (TDataNodeInfo dataNodeInfo : registeredDataNodes) {
DataNodeHeartbeatHandler handler =
new DataNodeHeartbeatHandler(
dataNodeInfo.getLocation(),
@@ -296,7 +296,7 @@ public class LoadManager {
}
/**
- * Send heartbeat requests to all the online ConfigNodes
+ * Send heartbeat requests to all the Registered ConfigNodes
*
* @param registeredConfigNodes ConfigNodes that registered in cluster
*/
@@ -338,6 +338,17 @@ public class LoadManager {
.collect(Collectors.toList());
}
+ public List<TDataNodeInfo> getOnlineDataNodes(int dataNodeId) {
+ return getNodeManager().getRegisteredDataNodes(dataNodeId).stream()
+ .filter(
+ registeredDataNode ->
+ heartbeatCacheMap
+ .get(registeredDataNode.getLocation().getDataNodeId())
+ .getNodeStatus()
+ .equals(NodeStatus.Running))
+ .collect(Collectors.toList());
+ }
+
private ConsensusManager getConsensusManager() {
return configManager.getConsensusManager();
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
index 421a726c9a..c028f61845 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
@@ -62,7 +62,9 @@ public class RegionBalancer {
CreateRegionGroupsPlan createRegionGroupsPlan = new
CreateRegionGroupsPlan();
IRegionAllocator regionAllocator = genRegionAllocator();
- List<TDataNodeInfo> onlineDataNodes =
getNodeManager().getOnlineDataNodes(-1);
+ // TODO: After waiting for the IT framework to complete, change the
following code to:
+ // List<TDataNodeInfo> onlineDataNodes =
getLoadManager().getOnlineDataNodes(-1);
+ List<TDataNodeInfo> registeredDataNodes =
getNodeManager().getRegisteredDataNodes(-1);
List<TRegionReplicaSet> allocatedRegions =
getPartitionManager().getAllReplicaSets();
for (Map.Entry<String, Integer> entry : allotmentMap.entrySet()) {
@@ -78,7 +80,7 @@ public class RegionBalancer {
: storageGroupSchema.getDataReplicationFactor();
// Check validity
- if (onlineDataNodes.size() < replicationFactor) {
+ if (registeredDataNodes.size() < replicationFactor) {
throw new NotEnoughDataNodeException();
}
@@ -86,7 +88,7 @@ public class RegionBalancer {
// Generate allocation plan
TRegionReplicaSet newRegion =
regionAllocator.allocateRegion(
- onlineDataNodes,
+ registeredDataNodes,
allocatedRegions,
replicationFactor,
new TConsensusGroupId(
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
index a4769da54e..e6040b86ca 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
@@ -84,10 +84,10 @@ public class NodeInfo implements SnapshotProcessor {
private final ReentrantReadWriteLock configNodeInfoReadWriteLock;
private final Set<TConfigNodeLocation> registeredConfigNodes;
- // Online DataNodes
+ // Registered DataNodes
private final ReentrantReadWriteLock dataNodeInfoReadWriteLock;
private final AtomicInteger nextNodeId = new AtomicInteger(0);
- private final ConcurrentNavigableMap<Integer, TDataNodeInfo> onlineDataNodes
=
+ private final ConcurrentNavigableMap<Integer, TDataNodeInfo>
registeredDataNodes =
new ConcurrentSkipListMap<>();
// For remove or draining DataNode
@@ -110,7 +110,7 @@ public class NodeInfo implements SnapshotProcessor {
Metric.CONFIG_NODE.toString(),
MetricLevel.CORE,
registeredConfigNodes,
- o -> getOnlineDataNodeCount(),
+ o -> getRegisteredDataNodeCount(),
Tag.NAME.toString(),
"online");
MetricsService.getInstance()
@@ -118,26 +118,26 @@ public class NodeInfo implements SnapshotProcessor {
.getOrCreateAutoGauge(
Metric.DATA_NODE.toString(),
MetricLevel.CORE,
- onlineDataNodes,
+ registeredDataNodes,
Map::size,
Tag.NAME.toString(),
"online");
}
}
- /** @return true if the specific DataNode is now online */
- public boolean isOnlineDataNode(TDataNodeLocation info) {
+ /** @return true if the specific DataNode is registered */
+ public boolean isRegisteredDataNode(TDataNodeLocation dataNodeLocation) {
boolean result = false;
dataNodeInfoReadWriteLock.readLock().lock();
- int originalDataNodeId = info.getDataNodeId();
+ int originalDataNodeId = dataNodeLocation.getDataNodeId();
try {
- for (Map.Entry<Integer, TDataNodeInfo> entry :
onlineDataNodes.entrySet()) {
- info.setDataNodeId(entry.getKey());
- if (entry.getValue().getLocation().equals(info)) {
+ for (Map.Entry<Integer, TDataNodeInfo> entry :
registeredDataNodes.entrySet()) {
+ dataNodeLocation.setDataNodeId(entry.getKey());
+ if (entry.getValue().getLocation().equals(dataNodeLocation)) {
result = true;
break;
}
- info.setDataNodeId(originalDataNodeId);
+ dataNodeLocation.setDataNodeId(originalDataNodeId);
}
} finally {
dataNodeInfoReadWriteLock.readLock().unlock();
@@ -194,7 +194,7 @@ public class NodeInfo implements SnapshotProcessor {
TDataNodeInfo info = activateDataNodePlan.getInfo();
dataNodeInfoReadWriteLock.writeLock().lock();
try {
- onlineDataNodes.put(info.getLocation().getDataNodeId(), info);
+ registeredDataNodes.put(info.getLocation().getDataNodeId(), info);
} finally {
dataNodeInfoReadWriteLock.writeLock().unlock();
}
@@ -216,12 +216,12 @@ public class NodeInfo implements SnapshotProcessor {
dataNodeInfoReadWriteLock.readLock().lock();
try {
if (dataNodeId == -1) {
- result.setDataNodeInfoMap(new HashMap<>(onlineDataNodes));
+ result.setDataNodeInfoMap(new HashMap<>(registeredDataNodes));
} else {
result.setDataNodeInfoMap(
- onlineDataNodes.get(dataNodeId) == null
+ registeredDataNodes.get(dataNodeId) == null
? new HashMap<>(0)
- : Collections.singletonMap(dataNodeId,
onlineDataNodes.get(dataNodeId)));
+ : Collections.singletonMap(dataNodeId,
registeredDataNodes.get(dataNodeId)));
}
} finally {
dataNodeInfoReadWriteLock.readLock().unlock();
@@ -230,12 +230,12 @@ public class NodeInfo implements SnapshotProcessor {
return result;
}
- /** Return the number of online DataNodes */
- public int getOnlineDataNodeCount() {
+ /** Return the number of registered DataNodes */
+ public int getRegisteredDataNodeCount() {
int result;
dataNodeInfoReadWriteLock.readLock().lock();
try {
- result = onlineDataNodes.size();
+ result = registeredDataNodes.size();
} finally {
dataNodeInfoReadWriteLock.readLock().unlock();
}
@@ -247,7 +247,7 @@ public class NodeInfo implements SnapshotProcessor {
int result = 0;
dataNodeInfoReadWriteLock.readLock().lock();
try {
- for (TDataNodeInfo info : onlineDataNodes.values()) {
+ for (TDataNodeInfo info : registeredDataNodes.values()) {
result += info.getCpuCoreNum();
}
} finally {
@@ -257,21 +257,20 @@ public class NodeInfo implements SnapshotProcessor {
}
/**
- * Return the specific online DataNode
+ * Return the specific registered DataNode
*
* @param dataNodeId Specific DataNodeId
- * @return All online DataNodes if dataNodeId equals -1. And return the
specific DataNode
+ * @return All registered DataNodes if dataNodeId equals -1. And return the
specific DataNode
* otherwise.
*/
- public List<TDataNodeInfo> getOnlineDataNodes(int dataNodeId) {
+ public List<TDataNodeInfo> getRegisteredDataNodes(int dataNodeId) {
List<TDataNodeInfo> result;
dataNodeInfoReadWriteLock.readLock().lock();
try {
- // TODO: Check DataNode status, ensure the returned DataNode isn't
removed
if (dataNodeId == -1) {
- result = new ArrayList<>(onlineDataNodes.values());
+ result = new ArrayList<>(registeredDataNodes.values());
} else {
- result = Collections.singletonList(onlineDataNodes.get(dataNodeId));
+ result =
Collections.singletonList(registeredDataNodes.get(dataNodeId));
}
} finally {
dataNodeInfoReadWriteLock.readLock().unlock();
@@ -379,7 +378,7 @@ public class NodeInfo implements SnapshotProcessor {
ReadWriteIOUtils.write(nextNodeId.get(), fileOutputStream);
- serializeOnlineDataNode(fileOutputStream, protocol);
+ serializeRegisteredDataNode(fileOutputStream, protocol);
serializeDrainingDataNodes(fileOutputStream, protocol);
@@ -403,10 +402,10 @@ public class NodeInfo implements SnapshotProcessor {
}
}
- private void serializeOnlineDataNode(OutputStream outputStream, TProtocol
protocol)
+ private void serializeRegisteredDataNode(OutputStream outputStream,
TProtocol protocol)
throws IOException, TException {
- ReadWriteIOUtils.write(onlineDataNodes.size(), outputStream);
- for (Entry<Integer, TDataNodeInfo> entry : onlineDataNodes.entrySet()) {
+ ReadWriteIOUtils.write(registeredDataNodes.size(), outputStream);
+ for (Entry<Integer, TDataNodeInfo> entry : registeredDataNodes.entrySet())
{
ReadWriteIOUtils.write(entry.getKey(), outputStream);
entry.getValue().write(protocol);
}
@@ -442,7 +441,7 @@ public class NodeInfo implements SnapshotProcessor {
nextNodeId.set(ReadWriteIOUtils.readInt(fileInputStream));
- deserializeOnlineDataNode(fileInputStream, protocol);
+ deserializeRegisteredDataNode(fileInputStream, protocol);
deserializeDrainingDataNodes(fileInputStream, protocol);
@@ -452,14 +451,14 @@ public class NodeInfo implements SnapshotProcessor {
}
}
- private void deserializeOnlineDataNode(InputStream inputStream, TProtocol
protocol)
+ private void deserializeRegisteredDataNode(InputStream inputStream,
TProtocol protocol)
throws IOException, TException {
int size = ReadWriteIOUtils.readInt(inputStream);
while (size > 0) {
int dataNodeId = ReadWriteIOUtils.readInt(inputStream);
TDataNodeInfo dataNodeInfo = new TDataNodeInfo();
dataNodeInfo.read(protocol);
- onlineDataNodes.put(dataNodeId, dataNodeInfo);
+ registeredDataNodes.put(dataNodeId, dataNodeInfo);
size--;
}
}
@@ -493,7 +492,7 @@ public class NodeInfo implements SnapshotProcessor {
public void clear() {
nextNodeId.set(0);
- onlineDataNodes.clear();
+ registeredDataNodes.clear();
drainingDataNodes.clear();
registeredConfigNodes.clear();
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index e2e5f87cae..82981f54d6 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -106,7 +106,7 @@ public class ConfigNodeProcedureEnv {
if (skipForTest) {
return invalidCacheResult;
}
- List<TDataNodeInfo> allDataNodes =
configManager.getNodeManager().getOnlineDataNodes(-1);
+ List<TDataNodeInfo> allDataNodes =
configManager.getNodeManager().getRegisteredDataNodes(-1);
TInvalidateCacheReq invalidateCacheReq = new TInvalidateCacheReq();
invalidateCacheReq.setStorageGroup(true);
invalidateCacheReq.setFullPath(storageGroupName);
diff --git
a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java
b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java
index ba79e6d101..20976cba96 100644
---
a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java
+++
b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java
@@ -78,7 +78,7 @@ public class NodeInfoTest {
nodeInfo.setDrainingDataNodes(drainingDataNodes_before);
int nextId = nodeInfo.getNextNodeId();
- List<TDataNodeInfo> onlineDataNodes_before =
nodeInfo.getOnlineDataNodes(-1);
+ List<TDataNodeInfo> onlineDataNodes_before =
nodeInfo.getRegisteredDataNodes(-1);
nodeInfo.processTakeSnapshot(snapshotDir);
nodeInfo.clear();
@@ -89,7 +89,7 @@ public class NodeInfoTest {
Set<TDataNodeLocation> drainingDataNodes_after =
nodeInfo.getDrainingDataNodes();
Assert.assertEquals(drainingDataNodes_before, drainingDataNodes_after);
- List<TDataNodeInfo> onlineDataNodes_after =
nodeInfo.getOnlineDataNodes(-1);
+ List<TDataNodeInfo> onlineDataNodes_after =
nodeInfo.getRegisteredDataNodes(-1);
Assert.assertEquals(onlineDataNodes_before, onlineDataNodes_after);
}