This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c710efbb51d Adjust some access modifier of confignode (#12505)
c710efbb51d is described below
commit c710efbb51d77715f04dbcbcc788a9d0890db9a7
Author: Li Yu Heng <[email protected]>
AuthorDate: Sat May 11 15:17:44 2024 +0800
Adjust some access modifier of confignode (#12505)
---
.../iotdb/confignode/manager/ConfigManager.java | 35 ++++++++++++++++------
.../iotdb/confignode/manager/load/LoadManager.java | 12 +++++---
.../manager/load/service/HeartbeatService.java | 16 +++++++---
.../iotdb/confignode/manager/node/NodeManager.java | 6 +++-
.../confignode/persistence/node/NodeInfo.java | 7 ++---
.../iotdb/confignode/service/ConfigNode.java | 28 ++++++++++++-----
.../thrift/ConfigNodeRPCServiceProcessor.java | 4 ++-
7 files changed, 77 insertions(+), 31 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 6e1a7aac762..1be83c2beba 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -238,7 +238,7 @@ public class ConfigManager implements IManager {
private final ClusterManager clusterManager;
/** Manage cluster node. */
- private final NodeManager nodeManager;
+ protected NodeManager nodeManager;
/** Manage cluster schema engine. */
private final ClusterSchemaManager clusterSchemaManager;
@@ -250,7 +250,7 @@ public class ConfigManager implements IManager {
private final PermissionManager permissionManager;
/** Manage load balancing. */
- private final LoadManager loadManager;
+ protected LoadManager loadManager;
/** Manage procedure. */
private final ProcedureManager procedureManager;
@@ -313,7 +313,7 @@ public class ConfigManager implements IManager {
// Build the manager module
this.clusterManager = new ClusterManager(this, clusterInfo);
- this.nodeManager = new NodeManager(this, nodeInfo);
+ setNodeManager(nodeInfo);
this.clusterSchemaManager =
new ClusterSchemaManager(
this,
@@ -333,7 +333,7 @@ public class ConfigManager implements IManager {
// LoadManager will register PipeManager as a listener.
// 2. keep RetryFailedTasksThread initialization after LoadManager
initialization,
// because RetryFailedTasksThread will keep a reference of LoadManager.
- this.loadManager = new LoadManager(this);
+ setLoadManager();
this.retryFailedTasksThread = new RetryFailedTasksThread(this);
this.clusterQuotaManager = new ClusterQuotaManager(this, quotaInfo);
@@ -344,6 +344,14 @@ public class ConfigManager implements IManager {
this.consensusManager.get().start();
}
+ protected void setNodeManager(NodeInfo nodeInfo) {
+ this.nodeManager = new NodeManager(this, nodeInfo);
+ }
+
+ protected void setLoadManager() {
+ this.loadManager = new LoadManager(this);
+ }
+
public void close() throws IOException {
if (consensusManager.get() != null) {
consensusManager.get().close();
@@ -476,11 +484,20 @@ public class ConfigManager implements IManager {
dataNodeLocation ->
nodeStatus.putIfAbsent(
dataNodeLocation.getDataNodeId(),
NodeStatus.Unknown.toString()));
- return new TShowClusterResp(
- status, configNodeLocations, dataNodeLocations, nodeStatus,
nodeVersionInfo);
+
+ return new TShowClusterResp()
+ .setStatus(status)
+ .setConfigNodeList(configNodeLocations)
+ .setDataNodeList(dataNodeLocations)
+ .setNodeStatus(nodeStatus)
+ .setNodeVersionInfo(nodeVersionInfo);
} else {
- return new TShowClusterResp(
- status, new ArrayList<>(), new ArrayList<>(), new HashMap<>(), new
HashMap<>());
+ return new TShowClusterResp()
+ .setStatus(status)
+ .setConfigNodeList(Collections.emptyList())
+ .setDataNodeList(Collections.emptyList())
+ .setNodeStatus(Collections.emptyMap())
+ .setNodeVersionInfo(Collections.emptyMap());
}
}
@@ -983,7 +1000,7 @@ public class ConfigManager implements IManager {
dataPartitionRespString);
}
- private TSStatus confirmLeader() {
+ protected TSStatus confirmLeader() {
// Make sure the consensus layer has been initialized
if (getConsensusManager() == null) {
return new
TSStatus(TSStatusCode.CONSENSUS_NOT_INITIALIZED.getStatusCode())
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 551a1f4ea08..3260c3679d1 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -57,7 +57,7 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public class LoadManager {
- private final IManager configManager;
+ protected final IManager configManager;
/** Balancers. */
private final RegionBalancer regionBalancer;
@@ -66,9 +66,9 @@ public class LoadManager {
private final RouteBalancer routeBalancer;
/** Cluster load services. */
- private final LoadCache loadCache;
+ protected final LoadCache loadCache;
- private final HeartbeatService heartbeatService;
+ protected HeartbeatService heartbeatService;
private final StatisticsService statisticsService;
private final EventService eventService;
@@ -80,11 +80,15 @@ public class LoadManager {
this.routeBalancer = new RouteBalancer(configManager);
this.loadCache = new LoadCache();
- this.heartbeatService = new HeartbeatService(configManager, loadCache);
+ setHeartbeatService(configManager, loadCache);
this.statisticsService = new StatisticsService(loadCache);
this.eventService = new EventService(configManager, loadCache,
routeBalancer);
}
+ protected void setHeartbeatService(IManager configManager, LoadCache
loadCache) {
+ this.heartbeatService = new HeartbeatService(configManager, loadCache);
+ }
+
/**
* Generate an optimal CreateRegionGroupsPlan.
*
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
index b65b304827c..7bd24c2710b 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
@@ -63,7 +63,7 @@ public class HeartbeatService {
private static final long HEARTBEAT_INTERVAL =
ConfigNodeDescriptor.getInstance().getConf().getHeartbeatIntervalInMs();
- private final IManager configManager;
+ protected IManager configManager;
private final LoadCache loadCache;
/** Heartbeat executor service. */
@@ -78,10 +78,14 @@ public class HeartbeatService {
private static final int configNodeListPeriodicallySyncInterval = 100;
public HeartbeatService(IManager configManager, LoadCache loadCache) {
- this.configManager = configManager;
+ setConfigManager(configManager);
this.loadCache = loadCache;
}
+ protected void setConfigManager(IManager configManager) {
+ this.configManager = configManager;
+ }
+
/** Start the heartbeat service. */
public void startHeartbeatService() {
synchronized (heartbeatScheduleMonitor) {
@@ -198,13 +202,17 @@ public class HeartbeatService {
// Skip itself and the ConfigNode that is processing heartbeat
continue;
}
- ConfigNodeHeartbeatHandler handler =
- new ConfigNodeHeartbeatHandler(configNodeId,
configManager.getLoadManager());
+ ConfigNodeHeartbeatHandler handler =
getConfigNodeHeartbeatHandler(configNodeId);
+
AsyncConfigNodeHeartbeatClientPool.getInstance()
.getConfigNodeHeartBeat(configNodeLocation.getInternalEndPoint(),
heartbeatReq, handler);
}
}
+ protected ConfigNodeHeartbeatHandler getConfigNodeHeartbeatHandler(int
configNodeId) {
+ return new ConfigNodeHeartbeatHandler(configNodeId,
configManager.getLoadManager());
+ }
+
/**
* Send heartbeat requests to all the Registered DataNodes.
*
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index cedc2f86c9e..de4bb223c08 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -106,7 +106,7 @@ public class NodeManager {
public static final long HEARTBEAT_INTERVAL =
CONF.getHeartbeatIntervalInMs();
private final IManager configManager;
- private final NodeInfo nodeInfo;
+ protected final NodeInfo nodeInfo;
private final ReentrantLock removeConfigNodeLock;
@@ -563,6 +563,10 @@ public class NodeManager {
return dataNodeInfoList;
}
+ public int getDataNodeCpuCoreCount() {
+ return nodeInfo.getDataNodeTotalCpuCoreCount();
+ }
+
public List<TConfigNodeLocation> getRegisteredConfigNodes() {
return nodeInfo.getRegisteredConfigNodes();
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
index 155a3818be9..4f1c8819d8f 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
@@ -87,14 +87,13 @@ public class NodeInfo implements SnapshotProcessor {
private final Map<Integer, TConfigNodeLocation> registeredConfigNodes;
// Registered DataNodes
- private final ReentrantReadWriteLock dataNodeInfoReadWriteLock;
-
- private final ReentrantReadWriteLock versionInfoReadWriteLock;
-
private final AtomicInteger nextNodeId = new AtomicInteger(-1);
private final Map<Integer, TDataNodeConfiguration> registeredDataNodes;
+ private final ReentrantReadWriteLock dataNodeInfoReadWriteLock;
private final Map<Integer, TNodeVersionInfo> nodeVersionInfo;
+ private final ReentrantReadWriteLock versionInfoReadWriteLock;
+
private static final String SNAPSHOT_FILENAME = "node_info.bin";
public NodeInfo() {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
index b92918f58a5..cc8bdfe8959 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
@@ -96,11 +96,11 @@ public class ConfigNode implements ConfigNodeMBean {
IoTDBConstant.IOTDB_SERVICE_JMX_NAME,
IoTDBConstant.JMX_TYPE,
ServiceType.CONFIG_NODE.getJmxName());
- private final RegisterManager registerManager = new RegisterManager();
+ protected final RegisterManager registerManager = new RegisterManager();
protected ConfigManager configManager;
- private ConfigNode() {
+ protected ConfigNode() {
// We do not init anything here, so that we can re-initialize the instance
in IT.
}
@@ -299,14 +299,18 @@ public class ConfigNode implements ConfigNodeMBean {
void initConfigManager() {
try {
- configManager = new ConfigManager();
- } catch (IOException e) {
+ setConfigManager();
+ } catch (Exception e) {
LOGGER.error("Can't start ConfigNode consensus group!", e);
stop();
}
LOGGER.info("Successfully initialize ConfigManager.");
}
+ protected void setConfigManager() throws Exception {
+ this.configManager = new ConfigManager();
+ }
+
/**
* Register Non-seed {@link ConfigNode} when first startup.
*
@@ -388,11 +392,15 @@ public class ConfigNode implements ConfigNodeMBean {
// Setup RPCService
ConfigNodeRPCService configNodeRPCService = new ConfigNodeRPCService();
ConfigNodeRPCServiceProcessor configNodeRPCServiceProcessor =
- new ConfigNodeRPCServiceProcessor(configManager);
+ getConfigNodeRPCServiceProcessor();
configNodeRPCService.initSyncedServiceImpl(configNodeRPCServiceProcessor);
registerManager.register(configNodeRPCService);
}
+ protected ConfigNodeRPCServiceProcessor getConfigNodeRPCServiceProcessor() {
+ return new ConfigNodeRPCServiceProcessor(configManager);
+ }
+
private void waitForLeaderElected() {
while (!configManager.getConsensusManager().isLeaderExist()) {
LOGGER.info("Leader has not been elected yet, wait for 1 second");
@@ -433,7 +441,7 @@ public class ConfigNode implements ConfigNodeMBean {
return configManager;
}
- protected void addShutDownHook() {
+ private void addShutDownHook() {
Runtime.getRuntime().addShutdownHook(new ConfigNodeShutdownHook());
}
@@ -444,7 +452,7 @@ public class ConfigNode implements ConfigNodeMBean {
private static class ConfigNodeHolder {
- private static final ConfigNode INSTANCE = new ConfigNode();
+ private static ConfigNode instance = new ConfigNode();
private ConfigNodeHolder() {
// Empty constructor
@@ -452,6 +460,10 @@ public class ConfigNode implements ConfigNodeMBean {
}
public static ConfigNode getInstance() {
- return ConfigNodeHolder.INSTANCE;
+ return ConfigNodeHolder.instance;
+ }
+
+ public static void setInstance(ConfigNode configNode) {
+ ConfigNodeHolder.instance = configNode;
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index caa0afcb75b..6cb52ef7f27 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -192,7 +192,9 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
private static final ConfigNodeConfig CONFIG_NODE_CONFIG =
ConfigNodeDescriptor.getInstance().getConf();
- private final ConfigManager configManager;
+ protected ConfigManager configManager;
+
+ protected ConfigNodeRPCServiceProcessor() {}
public ConfigNodeRPCServiceProcessor(ConfigManager configManager) {
this.configManager = configManager;