This is an automated email from the ASF dual-hosted git repository. yongzao pushed a commit to branch ConfigNode-metric in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0a8171fcfbb447d38bc7d5f6310720a698427758 Author: YongzaoDan <[email protected]> AuthorDate: Wed Feb 15 22:59:59 2023 +0800 Basic Prometheus --- .../iotdb/confignode/conf/ConfigNodeConstant.java | 11 +- .../confignode/manager/ClusterSchemaManager.java | 3 +- .../iotdb/confignode/manager/ConfigManager.java | 7 +- .../apache/iotdb/confignode/manager/IManager.java | 1 + .../confignode/manager/PermissionManager.java | 6 +- .../iotdb/confignode/manager/SyncManager.java | 6 +- .../iotdb/confignode/manager/TriggerManager.java | 2 + .../manager/{ => consensus}/ConsensusManager.java | 20 +- .../iotdb/confignode/manager/load/LoadManager.java | 35 ++- .../manager/load/LoadManagerMetrics.java | 326 -------------------- .../iotdb/confignode/manager/node/NodeManager.java | 2 +- .../iotdb/confignode/manager/node/NodeMetrics.java | 104 +++++++ .../manager/partition/PartitionManager.java | 121 +++++--- .../manager/partition/PartitionMetrics.java | 334 +++++++++++++++++++++ .../persistence/metric/PartitionInfoMetrics.java | 197 ------------ .../partition/DatabasePartitionTable.java | 36 ++- .../persistence/partition/PartitionInfo.java | 201 +++++-------- .../persistence/schema/ClusterSchemaInfo.java | 100 +++--- .../procedure/env/ConfigNodeProcedureEnv.java | 2 +- .../procedure/store/ConfigProcedureStore.java | 2 +- .../iotdb/confignode/service/ConfigNode.java | 30 +- .../thrift/ConfigNodeRPCServiceHandlerMetrics.java | 2 +- .../thrift/ConfigNodeRPCServiceMetrics.java | 3 +- .../thrift/ConfigNodeRPCServiceProcessor.java | 2 +- .../apache/iotdb/commons/cluster/NodeStatus.java | 1 + .../org/apache/iotdb/commons/cluster/NodeType.java | 3 +- .../iotdb/commons/cluster/RegionRoleType.java | 1 + .../iotdb/commons/service/metric/enums/Metric.java | 16 +- 28 files changed, 748 insertions(+), 826 deletions(-) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java index 5ee02a2095..214db2c595 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.confignode.conf; import java.io.File; @@ -40,16 +41,6 @@ public class ConfigNodeConstant { public static final int MIN_SUPPORTED_JDK_VERSION = 8; - /** These variables are only used for cluster gauge metrics */ - public static final String METRIC_TAG_TOTAL = "total"; - - public static final String METRIC_STATUS_REGISTER = "Registered"; - public static final String METRIC_STATUS_ONLINE = "Online"; - public static final String METRIC_STATUS_UNKNOWN = "Unknown"; - - public static final String METRIC_CONFIG_NODE = "ConfigNode"; - public static final String METRIC_DATA_NODE = "DataNode"; - public static final String REMOVE_CONFIGNODE_USAGE = "Executed failed, check usage: <Node-id>/<internal_address>:<internal_port>"; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java index b2b45472a1..09c1a0b430 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java @@ -60,6 +60,7 @@ import org.apache.iotdb.confignode.consensus.response.template.AllTemplateSetInf import org.apache.iotdb.confignode.consensus.response.template.TemplateInfoResp; import org.apache.iotdb.confignode.consensus.response.template.TemplateSetInfoResp; import org.apache.iotdb.confignode.exception.DatabaseNotExistsException; +import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; import org.apache.iotdb.confignode.manager.node.NodeManager; import org.apache.iotdb.confignode.manager.observer.NodeStatisticsEvent; import org.apache.iotdb.confignode.manager.partition.PartitionManager; @@ -322,7 +323,7 @@ public class ClusterSchemaManager { // Get related DataNodes Set<TDataNodeLocation> dataNodeLocations = getPartitionManager() - .getStorageGroupRelatedDataNodes(storageGroup, TConsensusGroupType.DataRegion); + .getDatabaseRelatedDataNodes(storageGroup, TConsensusGroupType.DataRegion); for (TDataNodeLocation dataNodeLocation : dataNodeLocations) { dataNodeLocationMap.putIfAbsent(dataNodeLocation.getDataNodeId(), dataNodeLocation); diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index 327a8f5afc..183b4d538e 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -36,6 +36,7 @@ import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathPatternTree; +import org.apache.iotdb.commons.service.metric.MetricService; import org.apache.iotdb.commons.sync.pipe.PipeMessage; import org.apache.iotdb.commons.utils.AuthUtils; import org.apache.iotdb.commons.utils.PathUtils; @@ -80,11 +81,14 @@ import org.apache.iotdb.confignode.consensus.response.partition.SchemaNodeManage import org.apache.iotdb.confignode.consensus.response.partition.SchemaPartitionResp; import org.apache.iotdb.confignode.consensus.response.template.TemplateSetInfoResp; import org.apache.iotdb.confignode.consensus.statemachine.ConfigNodeRegionStateMachine; +import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; import org.apache.iotdb.confignode.manager.cq.CQManager; import org.apache.iotdb.confignode.manager.load.LoadManager; import org.apache.iotdb.confignode.manager.node.ClusterNodeStartUtils; import org.apache.iotdb.confignode.manager.node.NodeManager; +import org.apache.iotdb.confignode.manager.node.NodeMetrics; import org.apache.iotdb.confignode.manager.partition.PartitionManager; +import org.apache.iotdb.confignode.manager.partition.PartitionMetrics; import org.apache.iotdb.confignode.persistence.AuthorInfo; import org.apache.iotdb.confignode.persistence.ProcedureInfo; import org.apache.iotdb.confignode.persistence.TriggerInfo; @@ -1347,7 +1351,8 @@ public class ConfigManager implements IManager { @Override public void addMetrics() { - partitionManager.addMetrics(); + MetricService.getInstance().addMetricSet(new NodeMetrics(getNodeManager())); + MetricService.getInstance().addMetricSet(new PartitionMetrics(this)); } @Override diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java index d59791c5ab..0d87a87833 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java @@ -43,6 +43,7 @@ import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetTTLPl import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetTimePartitionIntervalPlan; import org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlan; import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan; +import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; import org.apache.iotdb.confignode.manager.cq.CQManager; import org.apache.iotdb.confignode.manager.load.LoadManager; import org.apache.iotdb.confignode.manager.node.NodeManager; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java index 3aee9c071d..098a67a88d 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java @@ -26,22 +26,18 @@ import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool; import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType; import org.apache.iotdb.confignode.consensus.request.auth.AuthorPlan; import org.apache.iotdb.confignode.consensus.response.auth.PermissionInfoResp; +import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; import org.apache.iotdb.confignode.persistence.AuthorInfo; import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp; import org.apache.iotdb.mpp.rpc.thrift.TInvalidatePermissionCacheReq; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.List; /** manager permission query and operation */ public class PermissionManager { - private static final Logger logger = LoggerFactory.getLogger(PermissionManager.class); - private final ConfigManager configManager; private final AuthorInfo authorInfo; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/SyncManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/SyncManager.java index 9e5645af7b..dc2c7bdd95 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/SyncManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/SyncManager.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.confignode.manager; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; @@ -43,6 +44,7 @@ import org.apache.iotdb.confignode.consensus.request.write.sync.SetPipeStatusPla import org.apache.iotdb.confignode.consensus.request.write.sync.ShowPipePlan; import org.apache.iotdb.confignode.consensus.response.pipe.PipeResp; import org.apache.iotdb.confignode.consensus.response.pipe.PipeSinkResp; +import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; import org.apache.iotdb.confignode.manager.node.NodeManager; import org.apache.iotdb.confignode.manager.observer.NodeStatisticsEvent; import org.apache.iotdb.confignode.persistence.sync.ClusterSyncInfo; @@ -301,8 +303,4 @@ public class SyncManager { private ConsensusManager getConsensusManager() { return configManager.getConsensusManager(); } - - private ProcedureManager getProcedureManager() { - return configManager.getProcedureManager(); - } } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java index ae094acc3e..08c6f392a9 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java @@ -37,6 +37,7 @@ import org.apache.iotdb.confignode.consensus.response.trigger.TransferringTrigge import org.apache.iotdb.confignode.consensus.response.trigger.TriggerLocationResp; import org.apache.iotdb.confignode.consensus.response.trigger.TriggerTableResp; import org.apache.iotdb.confignode.consensus.response.udf.JarResp; +import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; import org.apache.iotdb.confignode.manager.node.NodeManager; import org.apache.iotdb.confignode.persistence.TriggerInfo; import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq; @@ -64,6 +65,7 @@ import java.util.Map; import java.util.Optional; public class TriggerManager { + private static final Logger LOGGER = LoggerFactory.getLogger(TriggerManager.class); private final ConfigManager configManager; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java similarity index 96% rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java index aa914cf7c0..28a5304bf2 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.confignode.manager; +package org.apache.iotdb.confignode.manager.consensus; import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; import org.apache.iotdb.common.rpc.thrift.TEndPoint; @@ -24,13 +24,13 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.consensus.ConfigNodeRegionId; import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.commons.exception.BadNodeUrlException; -import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.confignode.conf.ConfigNodeConfig; import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.apache.iotdb.confignode.conf.SystemPropertiesUtils; import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan; import org.apache.iotdb.confignode.consensus.statemachine.ConfigNodeRegionStateMachine; import org.apache.iotdb.confignode.exception.AddPeerException; +import org.apache.iotdb.confignode.manager.IManager; import org.apache.iotdb.confignode.manager.node.NodeManager; import org.apache.iotdb.consensus.ConsensusFactory; import org.apache.iotdb.consensus.IConsensus; @@ -346,20 +346,4 @@ public class ConsensusManager { private NodeManager getNodeManager() { return configManager.getNodeManager(); } - - @TestOnly - public void singleCopyMayWaitUntilLeaderReady() { - long startTime = System.currentTimeMillis(); - long maxWaitTime = 1000L * 60; // milliseconds, which is 60s - try { - while (!consensusImpl.isLeader(consensusGroupId)) { - TimeUnit.MILLISECONDS.sleep(100); - long elapsed = System.currentTimeMillis() - startTime; - if (elapsed > maxWaitTime) { - return; - } - } - } catch (InterruptedException ignored) { - } - } } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java index 33d9ed4398..0ba371216f 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java @@ -29,7 +29,6 @@ import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; import org.apache.iotdb.commons.partition.DataPartitionTable; import org.apache.iotdb.commons.partition.SchemaPartitionTable; -import org.apache.iotdb.commons.service.metric.MetricService; import org.apache.iotdb.confignode.client.DataNodeRequestType; import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool; import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler; @@ -39,8 +38,6 @@ import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGr import org.apache.iotdb.confignode.exception.DatabaseNotExistsException; import org.apache.iotdb.confignode.exception.NoAvailableRegionGroupException; import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException; -import org.apache.iotdb.confignode.manager.ClusterSchemaManager; -import org.apache.iotdb.confignode.manager.ConsensusManager; import org.apache.iotdb.confignode.manager.IManager; import org.apache.iotdb.confignode.manager.load.balancer.PartitionBalancer; import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer; @@ -68,6 +65,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; /** @@ -108,8 +106,6 @@ public class LoadManager { eventBus.register(configManager.getClusterSchemaManager()); eventBus.register(configManager.getSyncManager()); - - MetricService.getInstance().addMetricSet(new LoadManagerMetrics(configManager)); } /** @@ -151,10 +147,31 @@ public class LoadManager { return partitionBalancer.allocateDataPartition(unassignedDataPartitionSlotsMap); } + /** @return Map<RegionGroupId, DataNodeId where the leader resides> */ public Map<TConsensusGroupId, Integer> getLatestRegionLeaderMap() { return routeBalancer.getLatestRegionLeaderMap(); } + /** + * Get the number of RegionGroup-leaders in the specified DataNode + * + * @param dataNodeId The specified DataNode + * @param type SchemaRegion or DataRegion + * @return The number of RegionGroup-leaders + */ + public int getRegionGroupLeaderCount(int dataNodeId, TConsensusGroupType type) { + AtomicInteger result = new AtomicInteger(0); + routeBalancer + .getLatestRegionLeaderMap() + .forEach( + ((consensusGroupId, leaderId) -> { + if (dataNodeId == leaderId && type.equals(consensusGroupId.getType())) { + result.getAndIncrement(); + } + })); + return result.get(); + } + /** * Generate an optimal real-time read/write requests routing policy. * @@ -333,18 +350,10 @@ public class LoadManager { return routeBalancer; } - private ConsensusManager getConsensusManager() { - return configManager.getConsensusManager(); - } - private NodeManager getNodeManager() { return configManager.getNodeManager(); } - private ClusterSchemaManager getClusterSchemaManager() { - return configManager.getClusterSchemaManager(); - } - private PartitionManager getPartitionManager() { return configManager.getPartitionManager(); } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManagerMetrics.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManagerMetrics.java deleted file mode 100644 index 6198acc5ae..0000000000 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManagerMetrics.java +++ /dev/null @@ -1,326 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.confignode.manager.load; - -import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; -import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; -import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; -import org.apache.iotdb.commons.cluster.NodeStatus; -import org.apache.iotdb.commons.service.metric.enums.Metric; -import org.apache.iotdb.commons.service.metric.enums.Tag; -import org.apache.iotdb.commons.utils.NodeUrlUtils; -import org.apache.iotdb.confignode.manager.IManager; -import org.apache.iotdb.confignode.manager.node.NodeManager; -import org.apache.iotdb.confignode.manager.partition.PartitionManager; -import org.apache.iotdb.metrics.AbstractMetricService; -import org.apache.iotdb.metrics.metricsets.IMetricSet; -import org.apache.iotdb.metrics.utils.MetricLevel; -import org.apache.iotdb.metrics.utils.MetricType; - -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; - -import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.METRIC_CONFIG_NODE; -import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.METRIC_DATA_NODE; -import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.METRIC_STATUS_ONLINE; -import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.METRIC_STATUS_REGISTER; -import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.METRIC_STATUS_UNKNOWN; -import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.METRIC_TAG_TOTAL; - -/** This class collates metrics about loadManager */ -public class LoadManagerMetrics implements IMetricSet { - - private final IManager configManager; - - public LoadManagerMetrics(IManager configManager) { - this.configManager = configManager; - } - - @Override - public void bindTo(AbstractMetricService metricService) { - metricService.createAutoGauge( - Metric.CONFIG_NODE.toString(), - MetricLevel.CORE, - this, - o -> getRegisterConfigNodesNum(metricService), - Tag.NAME.toString(), - METRIC_TAG_TOTAL, - Tag.STATUS.toString(), - METRIC_STATUS_REGISTER); - - metricService.createAutoGauge( - Metric.DATA_NODE.toString(), - MetricLevel.CORE, - this, - o -> getRegisterDataNodesNum(metricService), - Tag.NAME.toString(), - METRIC_TAG_TOTAL, - Tag.STATUS.toString(), - METRIC_STATUS_REGISTER); - - metricService.createAutoGauge( - Metric.CONFIG_NODE.toString(), - MetricLevel.CORE, - this, - o -> getRunningConfigNodesNum(metricService), - Tag.NAME.toString(), - METRIC_TAG_TOTAL, - Tag.STATUS.toString(), - METRIC_STATUS_ONLINE); - - metricService.createAutoGauge( - Metric.DATA_NODE.toString(), - MetricLevel.CORE, - this, - o -> getRunningDataNodesNum(metricService), - Tag.NAME.toString(), - METRIC_TAG_TOTAL, - Tag.STATUS.toString(), - METRIC_STATUS_ONLINE); - - metricService.createAutoGauge( - Metric.CONFIG_NODE.toString(), - MetricLevel.CORE, - this, - o -> getUnknownConfigNodesNum(metricService), - Tag.NAME.toString(), - METRIC_TAG_TOTAL, - Tag.STATUS.toString(), - METRIC_STATUS_UNKNOWN); - - metricService.createAutoGauge( - Metric.DATA_NODE.toString(), - MetricLevel.CORE, - this, - o -> getUnknownDataNodesNum(metricService), - Tag.NAME.toString(), - METRIC_TAG_TOTAL, - Tag.STATUS.toString(), - METRIC_STATUS_UNKNOWN); - } - - @Override - public void unbindFrom(AbstractMetricService metricService) { - metricService.remove( - MetricType.AUTO_GAUGE, - Metric.CONFIG_NODE.toString(), - Tag.NAME.toString(), - METRIC_TAG_TOTAL, - Tag.STATUS.toString(), - METRIC_STATUS_REGISTER); - - metricService.remove( - MetricType.AUTO_GAUGE, - Metric.DATA_NODE.toString(), - Tag.NAME.toString(), - METRIC_TAG_TOTAL, - Tag.STATUS.toString(), - METRIC_STATUS_REGISTER); - - metricService.remove( - MetricType.AUTO_GAUGE, - Metric.CONFIG_NODE.toString(), - Tag.NAME.toString(), - METRIC_TAG_TOTAL, - Tag.STATUS.toString(), - METRIC_STATUS_ONLINE); - - metricService.remove( - MetricType.AUTO_GAUGE, - Metric.DATA_NODE.toString(), - Tag.NAME.toString(), - METRIC_TAG_TOTAL, - Tag.STATUS.toString(), - METRIC_STATUS_ONLINE); - - metricService.remove( - MetricType.AUTO_GAUGE, - Metric.CONFIG_NODE.toString(), - Tag.NAME.toString(), - METRIC_TAG_TOTAL, - Tag.STATUS.toString(), - METRIC_STATUS_UNKNOWN); - - metricService.remove( - MetricType.AUTO_GAUGE, - Metric.DATA_NODE.toString(), - Tag.NAME.toString(), - METRIC_TAG_TOTAL, - Tag.STATUS.toString(), - METRIC_STATUS_UNKNOWN); - - getNodeManager() - .getRegisteredDataNodes() - .forEach( - dataNodeInfo -> { - TDataNodeLocation dataNodeLocation = dataNodeInfo.getLocation(); - String name = - NodeUrlUtils.convertTEndPointUrl(dataNodeLocation.getClientRpcEndPoint()); - - metricService.remove( - MetricType.GAUGE, - Metric.CLUSTER_NODE_LEADER_COUNT.toString(), - Tag.NAME.toString(), - name); - }); - } - - private NodeManager getNodeManager() { - return configManager.getNodeManager(); - } - - private PartitionManager getPartitionManager() { - return configManager.getPartitionManager(); - } - - private LoadManager getLoadManager() { - return configManager.getLoadManager(); - } - - private int getRegisterConfigNodesNum(AbstractMetricService metricService) { - return getNodeManager().getRegisteredConfigNodes().size(); - } - - private int getRegisterDataNodesNum(AbstractMetricService metricService) { - List<TDataNodeConfiguration> dataNodeConfigurations = getNodeManager().getRegisteredDataNodes(); - Map<Integer, Integer> idToCountMap = new ConcurrentHashMap<>(); - - getLoadManager() - .getLatestRegionLeaderMap() - .forEach((consensusGroupId, nodeId) -> idToCountMap.merge(nodeId, 1, Integer::sum)); - for (TDataNodeConfiguration dataNodeInfo : dataNodeConfigurations) { - TDataNodeLocation dataNodeLocation = dataNodeInfo.getLocation(); - int dataNodeId = dataNodeLocation.getDataNodeId(); - String name = NodeUrlUtils.convertTEndPointUrl(dataNodeLocation.getClientRpcEndPoint()); - metricService - .getOrCreateGauge( - Metric.CLUSTER_NODE_LEADER_COUNT.toString(), - MetricLevel.CORE, - Tag.NAME.toString(), - name) - .set(idToCountMap.getOrDefault(dataNodeId, 0)); - } - return dataNodeConfigurations.size(); - } - - private int getRunningConfigNodesNum(AbstractMetricService metricService) { - List<TConfigNodeLocation> runningConfigNodes = - getNodeManager().filterConfigNodeThroughStatus(NodeStatus.Running); - if (runningConfigNodes == null) { - return 0; - } - for (TConfigNodeLocation configNodeLocation : runningConfigNodes) { - String name = NodeUrlUtils.convertTEndPointUrl(configNodeLocation.getInternalEndPoint()); - - metricService - .getOrCreateGauge( - Metric.CLUSTER_NODE_STATUS.toString(), - MetricLevel.CORE, - Tag.NAME.toString(), - name, - Tag.TYPE.toString(), - METRIC_CONFIG_NODE) - .set(1); - } - return runningConfigNodes.size(); - } - - private int getRunningDataNodesNum(AbstractMetricService metricService) { - List<TDataNodeConfiguration> runningDataNodes = - getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running); - if (runningDataNodes == null) { - return 0; - } - for (TDataNodeConfiguration dataNodeInfo : runningDataNodes) { - TDataNodeLocation dataNodeLocation = dataNodeInfo.getLocation(); - String name = NodeUrlUtils.convertTEndPointUrl(dataNodeLocation.getClientRpcEndPoint()); - - metricService - .getOrCreateGauge( - Metric.CLUSTER_NODE_STATUS.toString(), - MetricLevel.CORE, - Tag.NAME.toString(), - name, - Tag.TYPE.toString(), - METRIC_DATA_NODE) - .set(1); - } - return runningDataNodes.size(); - } - - private int getUnknownConfigNodesNum(AbstractMetricService metricService) { - List<TConfigNodeLocation> unknownConfigNodes = - getNodeManager().filterConfigNodeThroughStatus(NodeStatus.Unknown); - if (unknownConfigNodes == null) { - return 0; - } - for (TConfigNodeLocation configNodeLocation : unknownConfigNodes) { - String name = NodeUrlUtils.convertTEndPointUrl(configNodeLocation.getInternalEndPoint()); - - metricService - .getOrCreateGauge( - Metric.CLUSTER_NODE_STATUS.toString(), - MetricLevel.CORE, - Tag.NAME.toString(), - name, - Tag.TYPE.toString(), - METRIC_CONFIG_NODE) - .set(0); - } - return unknownConfigNodes.size(); - } - - private int getUnknownDataNodesNum(AbstractMetricService metricService) { - List<TDataNodeConfiguration> unknownDataNodes = - getNodeManager().filterDataNodeThroughStatus(NodeStatus.Unknown); - if (unknownDataNodes == null) { - return 0; - } - for (TDataNodeConfiguration dataNodeInfo : unknownDataNodes) { - TDataNodeLocation dataNodeLocation = dataNodeInfo.getLocation(); - String name = NodeUrlUtils.convertTEndPointUrl(dataNodeLocation.getClientRpcEndPoint()); - - metricService - .getOrCreateGauge( - Metric.CLUSTER_NODE_STATUS.toString(), - MetricLevel.CORE, - Tag.NAME.toString(), - name, - Tag.TYPE.toString(), - METRIC_DATA_NODE) - .set(0); - } - return unknownDataNodes.size(); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - LoadManagerMetrics that = (LoadManagerMetrics) o; - return Objects.equals(configManager, that.configManager); - } - - @Override - public int hashCode() { - return Objects.hash(configManager); - } -} diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java index a7040004c0..f648be3731 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java @@ -53,10 +53,10 @@ import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeRegisterR import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp; import org.apache.iotdb.confignode.manager.ClusterSchemaManager; import org.apache.iotdb.confignode.manager.ConfigManager; -import org.apache.iotdb.confignode.manager.ConsensusManager; import org.apache.iotdb.confignode.manager.IManager; import org.apache.iotdb.confignode.manager.TriggerManager; import org.apache.iotdb.confignode.manager.UDFManager; +import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; import org.apache.iotdb.confignode.manager.load.LoadManager; import org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache; import org.apache.iotdb.confignode.manager.node.heartbeat.ConfigNodeHeartbeatCache; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeMetrics.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeMetrics.java new file mode 100644 index 0000000000..9c9eca0567 --- /dev/null +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeMetrics.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.node; + +import org.apache.iotdb.commons.cluster.NodeStatus; +import org.apache.iotdb.commons.cluster.NodeType; +import org.apache.iotdb.commons.service.metric.enums.Metric; +import org.apache.iotdb.commons.service.metric.enums.Tag; +import org.apache.iotdb.metrics.AbstractMetricService; +import org.apache.iotdb.metrics.metricsets.IMetricSet; +import org.apache.iotdb.metrics.utils.MetricLevel; +import org.apache.iotdb.metrics.utils.MetricType; + +import java.util.Objects; + +/** Monitoring cluster Nodes Metrics */ +public class NodeMetrics implements IMetricSet { + + private final NodeManager nodeManager; + + public NodeMetrics(NodeManager nodeManager) { + this.nodeManager = nodeManager; + } + + @Override + public void bindTo(AbstractMetricService metricService) { + for (NodeStatus status : NodeStatus.values()) { + metricService.createAutoGauge( + Metric.NODE_NUM.toString(), + MetricLevel.CORE, + nodeManager, + nodeManager -> nodeManager.filterConfigNodeThroughStatus(status).size(), + Tag.TYPE.toString(), + NodeType.ConfigNode.getNodeType(), + Tag.STATUS.toString(), + status.getStatus()); + + metricService.createAutoGauge( + Metric.NODE_NUM.toString(), + MetricLevel.CORE, + nodeManager, + nodeManager -> nodeManager.filterDataNodeThroughStatus(status).size(), + Tag.TYPE.toString(), + NodeType.DataNode.getNodeType(), + Tag.STATUS.toString(), + status.getStatus()); + } + } + + @Override + public void unbindFrom(AbstractMetricService metricService) { + for (NodeStatus status : NodeStatus.values()) { + metricService.remove( + MetricType.AUTO_GAUGE, + Metric.NODE_NUM.toString(), + Tag.TYPE.toString(), + NodeType.ConfigNode.getNodeType(), + Tag.STATUS.toString(), + status.getStatus()); + + metricService.remove( + MetricType.AUTO_GAUGE, + Metric.NODE_NUM.toString(), + Tag.TYPE.toString(), + NodeType.DataNode.getNodeType(), + Tag.STATUS.toString(), + status.getStatus()); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + NodeMetrics that = (NodeMetrics) o; + return nodeManager.equals(that.nodeManager); + } + + @Override + public int hashCode() { + return Objects.hash(nodeManager); + } +} diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java index 1529a8d594..dc818eb4f2 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java @@ -32,7 +32,6 @@ import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; import org.apache.iotdb.commons.partition.DataPartitionTable; import org.apache.iotdb.commons.partition.SchemaPartitionTable; import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor; -import org.apache.iotdb.commons.service.metric.MetricService; import org.apache.iotdb.confignode.client.DataNodeRequestType; import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool; import org.apache.iotdb.confignode.conf.ConfigNodeConfig; @@ -63,12 +62,11 @@ import org.apache.iotdb.confignode.exception.DatabaseNotExistsException; import org.apache.iotdb.confignode.exception.NoAvailableRegionGroupException; import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException; import org.apache.iotdb.confignode.manager.ClusterSchemaManager; -import org.apache.iotdb.confignode.manager.ConsensusManager; import org.apache.iotdb.confignode.manager.IManager; import org.apache.iotdb.confignode.manager.ProcedureManager; +import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; import org.apache.iotdb.confignode.manager.load.LoadManager; import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupCache; -import org.apache.iotdb.confignode.persistence.metric.PartitionInfoMetrics; import org.apache.iotdb.confignode.persistence.partition.PartitionInfo; import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionCreateTask; import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionDeleteTask; @@ -88,7 +86,6 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Arrays; -import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -97,6 +94,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; /** The PartitionManager Manages cluster PartitionTable read and write requests. */ @@ -505,15 +503,15 @@ public class PartitionManager { } /** - * Get the DataNodes who contain the specific StorageGroup's Schema or Data + * Get the DataNodes who contain the specified Database's Schema or Data * - * @param storageGroup The specific StorageGroup's name + * @param database The specific Database's name * @param type SchemaRegion or DataRegion * @return Set<TDataNodeLocation>, the related DataNodes */ - public Set<TDataNodeLocation> getStorageGroupRelatedDataNodes( - String storageGroup, TConsensusGroupType type) { - return partitionInfo.getStorageGroupRelatedDataNodes(storageGroup, type); + public Set<TDataNodeLocation> getDatabaseRelatedDataNodes( + String database, TConsensusGroupType type) { + return partitionInfo.getDatabaseRelatedDataNodes(database, type); } /** @@ -550,26 +548,51 @@ public class PartitionManager { /** * Only leader use this interface. * - * @param storageGroup The specified StorageGroup + * @param database The specified Database * @return All Regions' RegionReplicaSet of the specified StorageGroup */ - public List<TRegionReplicaSet> getAllReplicaSets(String storageGroup) { - return partitionInfo.getAllReplicaSets(storageGroup); + public List<TRegionReplicaSet> getAllReplicaSets(String database) { + return partitionInfo.getAllReplicaSets(database); } /** * Only leader use this interface. * - * <p>Get the number of RegionGroups currently owned by the specific StorageGroup + * <p>Get the number of Regions currently owned by the specified DataNode * - * @param storageGroup StorageGroupName + * @param dataNodeId The specified DataNode * @param type SchemaRegion or DataRegion - * @return Number of Regions currently owned by the specific StorageGroup - * @throws DatabaseNotExistsException When the specific StorageGroup doesn't exist + * @return The number of Regions currently owned by the specified DataNode */ - public int getRegionGroupCount(String storageGroup, TConsensusGroupType type) + public int getRegionCount(int dataNodeId, TConsensusGroupType type) { + return partitionInfo.getRegionCount(dataNodeId, type); + } + + /** + * Only leader use this interface. + * + * <p>Get the number of RegionGroups currently owned by the specified Database + * + * @param database DatabaseName + * @param type SchemaRegion or DataRegion + * @return Number of Regions currently owned by the specified Database + * @throws DatabaseNotExistsException When the specified Database doesn't exist + */ + public int getRegionGroupCount(String database, TConsensusGroupType type) throws DatabaseNotExistsException { - return partitionInfo.getRegionGroupCount(storageGroup, type); + return partitionInfo.getRegionGroupCount(database, type); + } + + /** + * Only leader use this interface. + * + * <p>Get the assigned SeriesPartitionSlots count in the specified Database + * + * @param database The specified Database + * @return The assigned SeriesPartitionSlots count + */ + public int getAssignedSeriesPartitionSlotsCount(String database) { + return partitionInfo.getAssignedSeriesPartitionSlotsCount(database); } /** @@ -600,25 +623,22 @@ public class PartitionManager { throw new NoAvailableRegionGroupException(type); } - result.sort(new PartitionComparator()); + result.sort( + (o1, o2) -> { + // Use the number of partitions as the first priority + if (o1.getLeft() < o2.getLeft()) { + return -1; + } else if (o1.getLeft() > o2.getLeft()) { + return 1; + } else { + // Use RegionGroup status as second priority, Running > Available > Discouraged + return getRegionGroupStatus(o1.getRight()) + .compareTo(getRegionGroupStatus(o2.getRight())); + } + }); return result; } - class PartitionComparator implements Comparator<Pair<Long, TConsensusGroupId>> { - - @Override - public int compare(Pair<Long, TConsensusGroupId> o1, Pair<Long, TConsensusGroupId> o2) { - // Use partition number as first priority - if (o1.getLeft() < o2.getLeft()) { - return -1; - } else if (o1.getLeft() > o2.getLeft()) { - return 1; - } else { - // Use RegionGroup status as second priority, Running > Available > Discouraged - return getRegionGroupStatus(o1.getRight()).compareTo(getRegionGroupStatus(o2.getRight())); - } - } - } /** * Only leader use this interface * @@ -649,10 +669,6 @@ public class PartitionManager { getConsensusManager().write(preDeleteDatabasePlan); } - public void addMetrics() { - MetricService.getInstance().addMetricSet(new PartitionInfoMetrics(partitionInfo)); - } - /** * Get TSeriesPartitionSlot * @@ -730,6 +746,7 @@ public class PartitionManager { public GetSeriesSlotListResp getSeriesSlotList(GetSeriesSlotListPlan plan) { return (GetSeriesSlotListResp) getConsensusManager().read(plan).getDataset(); } + /** * get database for region * @@ -885,6 +902,34 @@ public class PartitionManager { .collect(Collectors.toList()); } + /** + * Count the number of cluster Regions with specified RegionStatus + * + * @param type The specified RegionGroupType + * @param status The specified statues + * @return The number of cluster Regions with specified RegionStatus + */ + public int countRegionWithSpecifiedStatus(TConsensusGroupType type, RegionStatus... status) { + AtomicInteger result = new AtomicInteger(0); + regionGroupCacheMap.forEach( + (regionGroupId, regionGroupCache) -> { + if (type.equals(regionGroupId.getType())) { + regionGroupCache + .getStatistics() + .getRegionStatisticsMap() + .values() + .forEach( + regionStatistics -> { + if (Arrays.stream(status) + .anyMatch(s -> s.equals(regionStatistics.getRegionStatus()))) { + result.getAndIncrement(); + } + }); + } + }); + return result.get(); + } + /** * Safely get RegionStatus * diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionMetrics.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionMetrics.java new file mode 100644 index 0000000000..c5a3dd49e7 --- /dev/null +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionMetrics.java @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.partition; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; +import org.apache.iotdb.commons.cluster.RegionStatus; +import org.apache.iotdb.commons.service.metric.enums.Metric; +import org.apache.iotdb.commons.service.metric.enums.Tag; +import org.apache.iotdb.commons.utils.NodeUrlUtils; +import org.apache.iotdb.confignode.exception.DatabaseNotExistsException; +import org.apache.iotdb.confignode.manager.ClusterSchemaManager; +import org.apache.iotdb.confignode.manager.IManager; +import org.apache.iotdb.confignode.manager.load.LoadManager; +import org.apache.iotdb.confignode.manager.node.NodeManager; +import org.apache.iotdb.metrics.AbstractMetricService; +import org.apache.iotdb.metrics.metricsets.IMetricSet; +import org.apache.iotdb.metrics.utils.MetricLevel; +import org.apache.iotdb.metrics.utils.MetricType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Objects; + +public class PartitionMetrics implements IMetricSet { + + private static final Logger LOGGER = LoggerFactory.getLogger(PartitionMetrics.class); + + private final IManager configManager; + + public PartitionMetrics(IManager configManager) { + this.configManager = configManager; + } + + @Override + public void bindTo(AbstractMetricService metricService) { + bindRegionPartitionMetrics(metricService); + bindDataNodePartitionMetrics(metricService); + bindDatabasePartitionMetrics(metricService); + } + + @Override + public void unbindFrom(AbstractMetricService metricService) { + unbindRegionPartitionMetrics(metricService); + unbindDataNodePartitionMetrics(metricService); + unbindDatabasePartitionMetrics(metricService); + } + + private void bindRegionPartitionMetrics(AbstractMetricService metricService) { + for (RegionStatus status : RegionStatus.values()) { + // Count the number of SchemaRegions + metricService.createAutoGauge( + Metric.REGION_NUM.toString(), + MetricLevel.CORE, + getPartitionManager(), + partitionManager -> + partitionManager.countRegionWithSpecifiedStatus( + TConsensusGroupType.SchemaRegion, status), + Tag.TYPE.toString(), + TConsensusGroupType.SchemaRegion.toString(), + Tag.STATUS.toString(), + status.getStatus()); + + // Count the number of DataRegions + metricService.createAutoGauge( + Metric.REGION_NUM.toString(), + MetricLevel.CORE, + getPartitionManager(), + partitionManager -> + partitionManager.countRegionWithSpecifiedStatus( + TConsensusGroupType.DataRegion, status), + Tag.TYPE.toString(), + TConsensusGroupType.DataRegion.toString(), + Tag.STATUS.toString(), + status.getStatus()); + } + } + + private void unbindRegionPartitionMetrics(AbstractMetricService metricService) { + for (RegionStatus status : RegionStatus.values()) { + // Remove the number of SchemaRegions + metricService.remove( + MetricType.AUTO_GAUGE, + Metric.REGION_NUM.toString(), + Tag.TYPE.toString(), + TConsensusGroupType.SchemaRegion.toString(), + Tag.STATUS.toString(), + status.getStatus()); + + // Remove the number of DataRegions + metricService.remove( + MetricType.AUTO_GAUGE, + Metric.REGION_NUM.toString(), + Tag.TYPE.toString(), + TConsensusGroupType.DataRegion.toString(), + Tag.STATUS.toString(), + status.getStatus()); + } + } + + private void bindDataNodePartitionMetrics(AbstractMetricService metricService) { + List<TDataNodeConfiguration> registerDataNodes = getNodeManager().getRegisteredDataNodes(); + for (TDataNodeConfiguration dataNodeConfiguration : registerDataNodes) { + int dataNodeId = dataNodeConfiguration.getLocation().getDataNodeId(); + String dataNodeName = + NodeUrlUtils.convertTEndPointUrl( + dataNodeConfiguration.getLocation().getClientRpcEndPoint()); + + // Count the number of Regions in the specified DataNode + metricService.createAutoGauge( + Metric.REGION_NUM_IN_DATA_NODE.toString(), + MetricLevel.CORE, + getPartitionManager(), + partitionManager -> + partitionManager.getRegionCount(dataNodeId, TConsensusGroupType.SchemaRegion), + Tag.NAME.toString(), + dataNodeName, + Tag.TYPE.toString(), + TConsensusGroupType.SchemaRegion.toString()); + metricService.createAutoGauge( + Metric.REGION_NUM_IN_DATA_NODE.toString(), + MetricLevel.CORE, + getPartitionManager(), + partitionManager -> + partitionManager.getRegionCount(dataNodeId, TConsensusGroupType.DataRegion), + Tag.NAME.toString(), + dataNodeName, + Tag.TYPE.toString(), + TConsensusGroupType.DataRegion.toString()); + + // Count the number of RegionGroup-leaders in the specified DataNode + metricService.createAutoGauge( + Metric.REGION_GROUP_LEADER_NUM_IN_DATA_NODE.toString(), + MetricLevel.CORE, + getLoadManager(), + loadManager -> + loadManager.getRegionGroupLeaderCount(dataNodeId, TConsensusGroupType.SchemaRegion), + Tag.NAME.toString(), + dataNodeName, + Tag.TYPE.toString(), + TConsensusGroupType.SchemaRegion.toString()); + metricService.createAutoGauge( + Metric.REGION_GROUP_LEADER_NUM_IN_DATA_NODE.toString(), + MetricLevel.CORE, + getLoadManager(), + loadManager -> + loadManager.getRegionGroupLeaderCount(dataNodeId, TConsensusGroupType.DataRegion), + Tag.NAME.toString(), + dataNodeName, + Tag.TYPE.toString(), + TConsensusGroupType.DataRegion.toString()); + } + } + + private void unbindDataNodePartitionMetrics(AbstractMetricService metricService) { + List<TDataNodeConfiguration> registerDataNodes = getNodeManager().getRegisteredDataNodes(); + for (TDataNodeConfiguration dataNodeConfiguration : registerDataNodes) { + String dataNodeName = + NodeUrlUtils.convertTEndPointUrl( + dataNodeConfiguration.getLocation().getClientRpcEndPoint()); + + // Remove the number of Regions in the specified DataNode + metricService.remove( + MetricType.AUTO_GAUGE, + Metric.REGION_NUM_IN_DATA_NODE.toString(), + Tag.NAME.toString(), + dataNodeName, + Tag.TYPE.toString(), + TConsensusGroupType.SchemaRegion.toString()); + metricService.remove( + MetricType.AUTO_GAUGE, + Metric.REGION_NUM_IN_DATA_NODE.toString(), + Tag.NAME.toString(), + dataNodeName, + Tag.TYPE.toString(), + TConsensusGroupType.DataRegion.toString()); + + // Remove the number of RegionGroup-leaders in the specified DataNode + metricService.remove( + MetricType.AUTO_GAUGE, + Metric.REGION_GROUP_LEADER_NUM_IN_DATA_NODE.toString(), + Tag.NAME.toString(), + dataNodeName, + Tag.TYPE.toString(), + TConsensusGroupType.SchemaRegion.toString()); + metricService.remove( + MetricType.AUTO_GAUGE, + Metric.REGION_GROUP_LEADER_NUM_IN_DATA_NODE.toString(), + Tag.NAME.toString(), + dataNodeName, + Tag.TYPE.toString(), + TConsensusGroupType.DataRegion.toString()); + } + } + + private void bindDatabasePartitionMetrics(AbstractMetricService metricService) { + // Count the number of Databases + metricService.createAutoGauge( + Metric.DATABASE_NUM.toString(), + MetricLevel.CORE, + getClusterSchemaManager(), + clusterSchemaManager -> clusterSchemaManager.getDatabaseNames().size()); + + List<String> databases = getClusterSchemaManager().getDatabaseNames(); + for (String database : databases) { + // Count the number of SeriesSlots in the specified Database + metricService.createAutoGauge( + Metric.SERIES_SLOT_NUM_IN_DATABASE.toString(), + MetricLevel.CORE, + getPartitionManager(), + partitionManager -> partitionManager.getAssignedSeriesPartitionSlotsCount(database), + Tag.NAME.toString(), + database); + + // Count the number of RegionGroups in the specified Database + metricService.createAutoGauge( + Metric.REGION_GROUP_NUM_IN_DATABASE.toString(), + MetricLevel.CORE, + getPartitionManager(), + partitionManager -> { + try { + return partitionManager.getRegionGroupCount( + database, TConsensusGroupType.SchemaRegion); + } catch (DatabaseNotExistsException e) { + LOGGER.warn("Error when counting SchemaRegionGroups in Database: {}", database, e); + throw new RuntimeException(e); + } + }, + Tag.NAME.toString(), + database, + Tag.TYPE.toString(), + TConsensusGroupType.SchemaRegion.toString()); + metricService.createAutoGauge( + Metric.REGION_GROUP_NUM_IN_DATABASE.toString(), + MetricLevel.CORE, + getPartitionManager(), + partitionManager -> { + try { + return partitionManager.getRegionGroupCount(database, TConsensusGroupType.DataRegion); + } catch (DatabaseNotExistsException e) { + LOGGER.warn("Error when counting DataRegionGroups in Database: {}", database, e); + throw new RuntimeException(e); + } + }, + Tag.NAME.toString(), + database, + Tag.TYPE.toString(), + TConsensusGroupType.DataRegion.toString()); + } + } + + private void unbindDatabasePartitionMetrics(AbstractMetricService metricService) { + // Remove the number of Databases + metricService.remove(MetricType.AUTO_GAUGE, Metric.DATABASE_NUM.toString()); + + List<String> databases = getClusterSchemaManager().getDatabaseNames(); + for (String database : databases) { + // Remove the number of SeriesSlots in the specified Database + metricService.remove( + MetricType.AUTO_GAUGE, + Metric.SERIES_SLOT_NUM_IN_DATABASE.toString(), + Tag.NAME.toString(), + database); + + // Remove number of RegionGroups in the specified Database + metricService.remove( + MetricType.AUTO_GAUGE, + Metric.REGION_GROUP_NUM_IN_DATABASE.toString(), + Tag.NAME.toString(), + database, + Tag.TYPE.toString(), + TConsensusGroupType.SchemaRegion.toString()); + metricService.remove( + MetricType.AUTO_GAUGE, + Metric.REGION_GROUP_NUM_IN_DATABASE.toString(), + Tag.NAME.toString(), + database, + Tag.TYPE.toString(), + TConsensusGroupType.DataRegion.toString()); + } + } + + private NodeManager getNodeManager() { + return configManager.getNodeManager(); + } + + private ClusterSchemaManager getClusterSchemaManager() { + return configManager.getClusterSchemaManager(); + } + + private PartitionManager getPartitionManager() { + return configManager.getPartitionManager(); + } + + private LoadManager getLoadManager() { + return configManager.getLoadManager(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PartitionMetrics that = (PartitionMetrics) o; + return configManager.equals(that.configManager); + } + + @Override + public int hashCode() { + return Objects.hash(configManager); + } +} diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/metric/PartitionInfoMetrics.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/metric/PartitionInfoMetrics.java deleted file mode 100644 index 022f15be4e..0000000000 --- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/metric/PartitionInfoMetrics.java +++ /dev/null @@ -1,197 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.confignode.persistence.metric; - -import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; -import org.apache.iotdb.commons.service.metric.enums.Metric; -import org.apache.iotdb.commons.service.metric.enums.Tag; -import org.apache.iotdb.confignode.persistence.partition.DatabasePartitionTable; -import org.apache.iotdb.confignode.persistence.partition.PartitionInfo; -import org.apache.iotdb.metrics.AbstractMetricService; -import org.apache.iotdb.metrics.metricsets.IMetricSet; -import org.apache.iotdb.metrics.utils.MetricLevel; -import org.apache.iotdb.metrics.utils.MetricType; - -import java.util.Objects; - -public class PartitionInfoMetrics implements IMetricSet { - private PartitionInfo partitionInfo; - - public PartitionInfoMetrics(PartitionInfo partitionInfo) { - this.partitionInfo = partitionInfo; - } - - @Override - public void bindTo(AbstractMetricService metricService) { - metricService.createAutoGauge( - Metric.QUANTITY.toString(), - MetricLevel.CORE, - partitionInfo, - PartitionInfo::getStorageGroupPartitionTableSize, - Tag.NAME.toString(), - "database"); - metricService.createAutoGauge( - Metric.REGION.toString(), - MetricLevel.IMPORTANT, - partitionInfo, - o -> o.updateRegionGroupMetric(TConsensusGroupType.SchemaRegion), - Tag.NAME.toString(), - "total", - Tag.TYPE.toString(), - TConsensusGroupType.SchemaRegion.toString()); - metricService.createAutoGauge( - Metric.REGION.toString(), - MetricLevel.IMPORTANT, - partitionInfo, - o -> o.updateRegionGroupMetric(TConsensusGroupType.DataRegion), - Tag.NAME.toString(), - "total", - Tag.TYPE.toString(), - TConsensusGroupType.DataRegion.toString()); - } - - @Override - public void unbindFrom(AbstractMetricService metricService) { - metricService.remove( - MetricType.AUTO_GAUGE, Metric.QUANTITY.toString(), Tag.NAME.toString(), "database"); - metricService.remove( - MetricType.AUTO_GAUGE, - Metric.REGION.toString(), - Tag.NAME.toString(), - "total", - Tag.TYPE.toString(), - TConsensusGroupType.SchemaRegion.toString()); - metricService.remove( - MetricType.AUTO_GAUGE, - Metric.REGION.toString(), - Tag.NAME.toString(), - "total", - Tag.TYPE.toString(), - TConsensusGroupType.DataRegion.toString()); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - PartitionInfoMetrics that = (PartitionInfoMetrics) o; - return Objects.equals(partitionInfo, that.partitionInfo); - } - - @Override - public int hashCode() { - return Objects.hash(partitionInfo); - } - - public static class StorageGroupPartitionTableMetrics implements IMetricSet { - private DatabasePartitionTable databasePartitionTable; - - public StorageGroupPartitionTableMetrics(DatabasePartitionTable databasePartitionTable) { - this.databasePartitionTable = databasePartitionTable; - } - - @Override - public void bindTo(AbstractMetricService metricService) { - metricService.createAutoGauge( - Metric.REGION.toString(), - MetricLevel.NORMAL, - databasePartitionTable, - o -> o.getRegionGroupCount(TConsensusGroupType.SchemaRegion), - Tag.NAME.toString(), - databasePartitionTable.getDatabaseName(), - Tag.TYPE.toString(), - TConsensusGroupType.SchemaRegion.toString()); - metricService.createAutoGauge( - Metric.REGION.toString(), - MetricLevel.NORMAL, - databasePartitionTable, - o -> o.getRegionGroupCount(TConsensusGroupType.DataRegion), - Tag.NAME.toString(), - databasePartitionTable.getDatabaseName(), - Tag.TYPE.toString(), - TConsensusGroupType.DataRegion.toString()); - // TODO slot will be updated in the future - metricService.createAutoGauge( - Metric.SLOT.toString(), - MetricLevel.NORMAL, - databasePartitionTable, - DatabasePartitionTable::getSchemaPartitionMapSize, - Tag.NAME.toString(), - databasePartitionTable.getDatabaseName(), - Tag.TYPE.toString(), - "schemaSlotNumber"); - metricService.createAutoGauge( - Metric.SLOT.toString(), - MetricLevel.NORMAL, - databasePartitionTable, - DatabasePartitionTable::getDataPartitionMapSize, - Tag.NAME.toString(), - databasePartitionTable.getDatabaseName(), - Tag.TYPE.toString(), - "dataSlotNumber"); - } - - @Override - public void unbindFrom(AbstractMetricService metricService) { - metricService.remove( - MetricType.AUTO_GAUGE, - Metric.REGION.toString(), - Tag.NAME.toString(), - databasePartitionTable.getDatabaseName(), - Tag.TYPE.toString(), - TConsensusGroupType.SchemaRegion.toString()); - metricService.remove( - MetricType.AUTO_GAUGE, - Metric.REGION.toString(), - Tag.NAME.toString(), - databasePartitionTable.getDatabaseName(), - Tag.TYPE.toString(), - TConsensusGroupType.DataRegion.toString()); - // TODO slot will be updated in the future - metricService.remove( - MetricType.AUTO_GAUGE, - Metric.SLOT.toString(), - Tag.NAME.toString(), - databasePartitionTable.getDatabaseName(), - Tag.TYPE.toString(), - "schemaSlotNumber"); - metricService.remove( - MetricType.AUTO_GAUGE, - Metric.SLOT.toString(), - Tag.NAME.toString(), - databasePartitionTable.getDatabaseName(), - Tag.TYPE.toString(), - "dataSlotNumber"); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - StorageGroupPartitionTableMetrics that = (StorageGroupPartitionTableMetrics) o; - return Objects.equals(databasePartitionTable, that.databasePartitionTable); - } - - @Override - public int hashCode() { - return Objects.hash(databasePartitionTable); - } - } -} diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java index c432edad3b..5866a092c3 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java @@ -137,23 +137,35 @@ public class DatabasePartitionTable { } /** - * Get all RegionGroups currently owned by this StorageGroup + * Only leader use this interface. + * + * <p>Get the number of Regions currently owned by the specified DataNode * + * @param dataNodeId The specified DataNode * @param type SchemaRegion or DataRegion - * @return The regions currently owned by this StorageGroup + * @return The number of Regions currently owned by the specified DataNode */ - public Set<RegionGroup> getRegionGroups(TConsensusGroupType type) { - Set<RegionGroup> regionGroups = new HashSet<>(); + public int getRegionCount(int dataNodeId, TConsensusGroupType type) { + AtomicInteger result = new AtomicInteger(0); regionGroupMap .values() .forEach( regionGroup -> { - if (regionGroup.getId().getType().equals(type)) { - regionGroups.add(regionGroup); + if (type.equals(regionGroup.getId().getType())) { + regionGroup + .getReplicaSet() + .getDataNodeLocations() + .forEach( + dataNodeLocation -> { + if (dataNodeLocation.getDataNodeId() == dataNodeId) { + result.getAndIncrement(); + } + }); } }); - return regionGroups; + return result.get(); } + /** * Get the number of RegionGroups currently owned by this StorageGroup * @@ -271,7 +283,7 @@ public class DatabasePartitionTable { * @param type SchemaRegion or DataRegion * @return Set<TDataNodeLocation>, the related DataNodes */ - public Set<TDataNodeLocation> getStorageGroupRelatedDataNodes(TConsensusGroupType type) { + public Set<TDataNodeLocation> getDatabaseRelatedDataNodes(TConsensusGroupType type) { HashSet<TDataNodeLocation> result = new HashSet<>(); regionGroupMap.forEach( (consensusGroupId, regionGroup) -> { @@ -491,14 +503,6 @@ public class DatabasePartitionTable { return databaseName; } - public int getDataPartitionMapSize() { - return dataPartitionTable.getDataPartitionMap().size(); - } - - public int getSchemaPartitionMapSize() { - return schemaPartitionTable.getSchemaPartitionMap().size(); - } - @Override public boolean equals(Object o) { if (this == o) { diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java index 5e9ef561e1..2b17ffcd4d 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java @@ -28,9 +28,6 @@ import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.partition.DataPartitionTable; import org.apache.iotdb.commons.partition.SchemaPartitionTable; -import org.apache.iotdb.commons.service.metric.MetricService; -import org.apache.iotdb.commons.service.metric.enums.Metric; -import org.apache.iotdb.commons.service.metric.enums.Tag; import org.apache.iotdb.commons.snapshot.SnapshotProcessor; import org.apache.iotdb.confignode.consensus.request.read.partition.GetDataPartitionPlan; import org.apache.iotdb.confignode.consensus.request.read.partition.GetSchemaPartitionPlan; @@ -54,13 +51,11 @@ import org.apache.iotdb.confignode.consensus.response.partition.RegionInfoListRe import org.apache.iotdb.confignode.consensus.response.partition.SchemaNodeManagementResp; import org.apache.iotdb.confignode.consensus.response.partition.SchemaPartitionResp; import org.apache.iotdb.confignode.exception.DatabaseNotExistsException; -import org.apache.iotdb.confignode.persistence.metric.PartitionInfoMetrics; import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionMaintainTask; import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo; import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq; import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList; import org.apache.iotdb.consensus.common.DataSet; -import org.apache.iotdb.metrics.utils.MetricLevel; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.tsfile.utils.Pair; @@ -79,8 +74,6 @@ import java.io.FileOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -105,7 +98,7 @@ public class PartitionInfo implements SnapshotProcessor { // For allocating Regions private final AtomicInteger nextRegionGroupId; // Map<StorageGroupName, StorageGroupPartitionInfo> - private final Map<String, DatabasePartitionTable> storageGroupPartitionTables; + private final Map<String, DatabasePartitionTable> databasePartitionTables; /** For Region-Maintainer */ // For RegionReplicas' asynchronous management @@ -115,7 +108,7 @@ public class PartitionInfo implements SnapshotProcessor { public PartitionInfo() { this.nextRegionGroupId = new AtomicInteger(-1); - this.storageGroupPartitionTables = new ConcurrentHashMap<>(); + this.databasePartitionTables = new ConcurrentHashMap<>(); this.regionMaintainTaskList = Collections.synchronizedList(new ArrayList<>()); } @@ -137,10 +130,7 @@ public class PartitionInfo implements SnapshotProcessor { public TSStatus createDatabase(DatabaseSchemaPlan plan) { String storageGroupName = plan.getSchema().getName(); DatabasePartitionTable databasePartitionTable = new DatabasePartitionTable(storageGroupName); - storageGroupPartitionTables.put(storageGroupName, databasePartitionTable); - MetricService.getInstance() - .addMetricSet( - new PartitionInfoMetrics.StorageGroupPartitionTableMetrics(databasePartitionTable)); + databasePartitionTables.put(storageGroupName, databasePartitionTable); return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } @@ -157,7 +147,7 @@ public class PartitionInfo implements SnapshotProcessor { plan.getRegionGroupMap() .forEach( (storageGroup, regionReplicaSets) -> { - storageGroupPartitionTables.get(storageGroup).createRegionGroups(regionReplicaSets); + databasePartitionTables.get(storageGroup).createRegionGroups(regionReplicaSets); regionReplicaSets.forEach( regionReplicaSet -> maxRegionId.set( @@ -224,7 +214,7 @@ public class PartitionInfo implements SnapshotProcessor { final PreDeleteDatabasePlan.PreDeleteType preDeleteType = preDeleteDatabasePlan.getPreDeleteType(); final String storageGroup = preDeleteDatabasePlan.getStorageGroup(); - DatabasePartitionTable databasePartitionTable = storageGroupPartitionTables.get(storageGroup); + DatabasePartitionTable databasePartitionTable = databasePartitionTables.get(storageGroup); if (databasePartitionTable == null) { return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } @@ -248,7 +238,7 @@ public class PartitionInfo implements SnapshotProcessor { */ public void deleteDatabase(DeleteDatabasePlan plan) { // Clean the StorageGroupTable cache - storageGroupPartitionTables.remove(plan.getName()); + databasePartitionTables.remove(plan.getName()); } /** @@ -264,7 +254,7 @@ public class PartitionInfo implements SnapshotProcessor { if (plan.getPartitionSlotsMap().size() == 0) { // Return all SchemaPartitions when the queried PartitionSlots are empty - storageGroupPartitionTables.forEach( + databasePartitionTables.forEach( (storageGroup, databasePartitionTable) -> { if (!databasePartitionTable.isPredeleted()) { schemaPartition.put(storageGroup, new SchemaPartitionTable()); @@ -286,7 +276,7 @@ public class PartitionInfo implements SnapshotProcessor { if (isStorageGroupExisted(storageGroup)) { schemaPartition.put(storageGroup, new SchemaPartitionTable()); - if (!storageGroupPartitionTables + if (!databasePartitionTables .get(storageGroup) .getSchemaPartition(partitionSlots, schemaPartition.get(storageGroup))) { isAllPartitionsExist.set(false); @@ -325,7 +315,7 @@ public class PartitionInfo implements SnapshotProcessor { if (isStorageGroupExisted(storageGroup)) { dataPartition.put(storageGroup, new DataPartitionTable()); - if (!storageGroupPartitionTables + if (!databasePartitionTables .get(storageGroup) .getDataPartition(partitionSlots, dataPartition.get(storageGroup))) { isAllPartitionsExist.set(false); @@ -360,8 +350,8 @@ public class PartitionInfo implements SnapshotProcessor { TSeriesPartitionSlot seriesPartitionSlot, TTimePartitionSlot timePartitionSlot, long timePartitionInterval) { - if (storageGroupPartitionTables.containsKey(storageGroup)) { - return storageGroupPartitionTables + if (databasePartitionTables.containsKey(storageGroup)) { + return databasePartitionTables .get(storageGroup) .getPrecededDataPartition(seriesPartitionSlot, timePartitionSlot, timePartitionInterval); } else { @@ -370,8 +360,7 @@ public class PartitionInfo implements SnapshotProcessor { } private boolean isStorageGroupExisted(String storageGroup) { - final DatabasePartitionTable databasePartitionTable = - storageGroupPartitionTables.get(storageGroup); + final DatabasePartitionTable databasePartitionTable = databasePartitionTables.get(storageGroup); return databasePartitionTable != null && !databasePartitionTable.isPredeleted(); } @@ -386,7 +375,7 @@ public class PartitionInfo implements SnapshotProcessor { .forEach( (storageGroup, schemaPartitionTable) -> { if (isStorageGroupExisted(storageGroup)) { - storageGroupPartitionTables + databasePartitionTables .get(storageGroup) .createSchemaPartition(schemaPartitionTable); } @@ -406,9 +395,7 @@ public class PartitionInfo implements SnapshotProcessor { .forEach( (storageGroup, dataPartitionTable) -> { if (isStorageGroupExisted(storageGroup)) { - storageGroupPartitionTables - .get(storageGroup) - .createDataPartition(dataPartitionTable); + databasePartitionTables.get(storageGroup).createDataPartition(dataPartitionTable); } }); @@ -426,7 +413,7 @@ public class PartitionInfo implements SnapshotProcessor { storageGroup -> { schemaPartitionMap.put(storageGroup, new SchemaPartitionTable()); - storageGroupPartitionTables + databasePartitionTables .get(storageGroup) .getSchemaPartition(new ArrayList<>(), schemaPartitionMap.get(storageGroup)); @@ -445,14 +432,14 @@ public class PartitionInfo implements SnapshotProcessor { public DataSet getRegionInfoList(GetRegionInfoListPlan regionsInfoPlan) { RegionInfoListResp regionResp = new RegionInfoListResp(); List<TRegionInfo> regionInfoList = new Vector<>(); - if (storageGroupPartitionTables.isEmpty()) { + if (databasePartitionTables.isEmpty()) { regionResp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); regionResp.setRegionInfoList(new ArrayList<>()); return regionResp; } TShowRegionReq showRegionReq = regionsInfoPlan.getShowRegionReq(); final List<String> storageGroups = showRegionReq != null ? showRegionReq.getDatabases() : null; - storageGroupPartitionTables.forEach( + databasePartitionTables.forEach( (storageGroup, databasePartitionTable) -> { if (storageGroups != null && !storageGroups.contains(storageGroup)) { return; @@ -480,7 +467,7 @@ public class PartitionInfo implements SnapshotProcessor { TConsensusGroupId regionId = req.getRegionId(); TDataNodeLocation oldNode = req.getOldNode(); TDataNodeLocation newNode = req.getNewNode(); - storageGroupPartitionTables.values().stream() + databasePartitionTables.values().stream() .filter(sgPartitionTable -> sgPartitionTable.containRegion(regionId)) .forEach( sgPartitionTable -> sgPartitionTable.updateRegionLocation(regionId, oldNode, newNode)); @@ -496,7 +483,7 @@ public class PartitionInfo implements SnapshotProcessor { */ public String getRegionStorageGroup(TConsensusGroupId regionId) { Optional<DatabasePartitionTable> sgPartitionTableOptional = - storageGroupPartitionTables.values().stream() + databasePartitionTables.values().stream() .filter(s -> s.containRegion(regionId)) .findFirst(); return sgPartitionTableOptional.map(DatabasePartitionTable::getDatabaseName).orElse(null); @@ -522,7 +509,7 @@ public class PartitionInfo implements SnapshotProcessor { if (isStorageGroupExisted(storageGroup)) { result.put( storageGroup, - storageGroupPartitionTables + databasePartitionTables .get(storageGroup) .filterUnassignedSchemaPartitionSlots(partitionSlots)); } @@ -547,7 +534,7 @@ public class PartitionInfo implements SnapshotProcessor { if (isStorageGroupExisted(storageGroup)) { result.put( storageGroup, - storageGroupPartitionTables + databasePartitionTables .get(storageGroup) .filterUnassignedDataPartitionSlots(partitionSlots)); } @@ -563,7 +550,7 @@ public class PartitionInfo implements SnapshotProcessor { */ public List<TRegionReplicaSet> getAllReplicaSets() { List<TRegionReplicaSet> result = new ArrayList<>(); - storageGroupPartitionTables + databasePartitionTables .values() .forEach( databasePartitionTable -> result.addAll(databasePartitionTable.getAllReplicaSets())); @@ -578,7 +565,7 @@ public class PartitionInfo implements SnapshotProcessor { */ public List<TRegionReplicaSet> getAllReplicaSets(TConsensusGroupType type) { List<TRegionReplicaSet> result = new ArrayList<>(); - storageGroupPartitionTables + databasePartitionTables .values() .forEach( databasePartitionTable -> @@ -589,12 +576,12 @@ public class PartitionInfo implements SnapshotProcessor { /** * Only leader use this interface. * - * @param storageGroup The specified StorageGroup - * @return All Regions' RegionReplicaSet of the specified StorageGroup + * @param database The specified Database + * @return All Regions' RegionReplicaSet of the specified Database */ - public List<TRegionReplicaSet> getAllReplicaSets(String storageGroup) { - if (storageGroupPartitionTables.containsKey(storageGroup)) { - return storageGroupPartitionTables.get(storageGroup).getAllReplicaSets(); + public List<TRegionReplicaSet> getAllReplicaSets(String database) { + if (databasePartitionTables.containsKey(database)) { + return databasePartitionTables.get(database).getAllReplicaSets(); } else { return new ArrayList<>(); } @@ -603,36 +590,63 @@ public class PartitionInfo implements SnapshotProcessor { /** * Only leader use this interface. * - * <p>Get the number of RegionGroups currently owned by the specified StorageGroup + * <p>Get the number of Regions currently owned by the specified DataNode * - * @param storageGroup StorageGroupName + * @param dataNodeId The specified DataNode + * @param type SchemaRegion or DataRegion + * @return The number of Regions currently owned by the specified DataNode + */ + public int getRegionCount(int dataNodeId, TConsensusGroupType type) { + AtomicInteger result = new AtomicInteger(0); + databasePartitionTables + .values() + .forEach( + databasePartitionTable -> + result.getAndAdd(databasePartitionTable.getRegionCount(dataNodeId, type))); + return result.get(); + } + + /** + * Only leader use this interface. + * + * <p>Get the number of RegionGroups currently owned by the specified Database + * + * @param database DatabaseName * @param type SchemaRegion or DataRegion * @return Number of Regions currently owned by the specific StorageGroup * @throws DatabaseNotExistsException When the specific StorageGroup doesn't exist */ - public int getRegionGroupCount(String storageGroup, TConsensusGroupType type) + public int getRegionGroupCount(String database, TConsensusGroupType type) throws DatabaseNotExistsException { - if (!isStorageGroupExisted(storageGroup)) { - throw new DatabaseNotExistsException(storageGroup); + if (!isStorageGroupExisted(database)) { + throw new DatabaseNotExistsException(database); } - return storageGroupPartitionTables.get(storageGroup).getRegionGroupCount(type); + return databasePartitionTables.get(database).getRegionGroupCount(type); } - public int getAssignedSeriesPartitionSlotsCount(String storageGroup) { - return storageGroupPartitionTables.get(storageGroup).getAssignedSeriesPartitionSlotsCount(); + /** + * Only leader use this interface. + * + * <p>Get the assigned SeriesPartitionSlots count in the specified Database + * + * @param database The specified Database + * @return The assigned SeriesPartitionSlots count + */ + public int getAssignedSeriesPartitionSlotsCount(String database) { + return databasePartitionTables.get(database).getAssignedSeriesPartitionSlotsCount(); } /** * Get the DataNodes who contain the specific StorageGroup's Schema or Data * - * @param storageGroup The specific StorageGroup's name + * @param database The specific StorageGroup's name * @param type SchemaRegion or DataRegion * @return Set<TDataNodeLocation>, the related DataNodes */ - public Set<TDataNodeLocation> getStorageGroupRelatedDataNodes( - String storageGroup, TConsensusGroupType type) { - return storageGroupPartitionTables.get(storageGroup).getStorageGroupRelatedDataNodes(type); + public Set<TDataNodeLocation> getDatabaseRelatedDataNodes( + String database, TConsensusGroupType type) { + return databasePartitionTables.get(database).getDatabaseRelatedDataNodes(type); } /** @@ -645,53 +659,7 @@ public class PartitionInfo implements SnapshotProcessor { */ public List<Pair<Long, TConsensusGroupId>> getRegionGroupSlotsCounter( String storageGroup, TConsensusGroupType type) { - return storageGroupPartitionTables.get(storageGroup).getRegionGroupSlotsCounter(type); - } - - /** - * Update RegionGroup-related metric - * - * @param type SchemaRegion or DataRegion - * @return the number of SchemaRegion or DataRegion - */ - public int updateRegionGroupMetric(TConsensusGroupType type) { - Set<RegionGroup> regionGroups = new HashSet<>(); - for (Map.Entry<String, DatabasePartitionTable> entry : storageGroupPartitionTables.entrySet()) { - regionGroups.addAll(entry.getValue().getRegionGroups(type)); - } - int result = regionGroups.size(); - // datanode location -> region number - Map<TDataNodeLocation, Integer> dataNodeLocationIntegerMap = new HashMap<>(); - for (RegionGroup regionGroup : regionGroups) { - TRegionReplicaSet regionReplicaSet = regionGroup.getReplicaSet(); - List<TDataNodeLocation> dataNodeLocations = regionReplicaSet.getDataNodeLocations(); - for (TDataNodeLocation dataNodeLocation : dataNodeLocations) { - if (!dataNodeLocationIntegerMap.containsKey(dataNodeLocation)) { - dataNodeLocationIntegerMap.put(dataNodeLocation, 0); - } - dataNodeLocationIntegerMap.put( - dataNodeLocation, dataNodeLocationIntegerMap.get(dataNodeLocation) + 1); - } - } - for (Map.Entry<TDataNodeLocation, Integer> entry : dataNodeLocationIntegerMap.entrySet()) { - TDataNodeLocation dataNodeLocation = entry.getKey(); - String name = - "EndPoint(" - + dataNodeLocation.getClientRpcEndPoint().ip - + ":" - + dataNodeLocation.getClientRpcEndPoint().port - + ")"; - MetricService.getInstance() - .getOrCreateGauge( - Metric.REGION.toString(), - MetricLevel.IMPORTANT, - Tag.NAME.toString(), - name, - Tag.TYPE.toString(), - type.toString()) - .set(dataNodeLocationIntegerMap.get(dataNodeLocation)); - } - return result; + return databasePartitionTables.get(storageGroup).getRegionGroupSlotsCounter(type); } @Override @@ -717,9 +685,9 @@ public class PartitionInfo implements SnapshotProcessor { ReadWriteIOUtils.write(nextRegionGroupId.get(), fileOutputStream); // serialize StorageGroupPartitionTable - ReadWriteIOUtils.write(storageGroupPartitionTables.size(), fileOutputStream); + ReadWriteIOUtils.write(databasePartitionTables.size(), fileOutputStream); for (Map.Entry<String, DatabasePartitionTable> storageGroupPartitionTableEntry : - storageGroupPartitionTables.entrySet()) { + databasePartitionTables.entrySet()) { ReadWriteIOUtils.write(storageGroupPartitionTableEntry.getKey(), fileOutputStream); storageGroupPartitionTableEntry.getValue().serialize(fileOutputStream, protocol); } @@ -776,7 +744,7 @@ public class PartitionInfo implements SnapshotProcessor { } DatabasePartitionTable databasePartitionTable = new DatabasePartitionTable(storageGroup); databasePartitionTable.deserialize(fileInputStream, protocol); - storageGroupPartitionTables.put(storageGroup, databasePartitionTable); + databasePartitionTables.put(storageGroup, databasePartitionTable); } // restore deletedRegionSet @@ -793,8 +761,7 @@ public class PartitionInfo implements SnapshotProcessor { return new GetRegionIdResp( new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), new ArrayList<>()); } - DatabasePartitionTable sgPartitionTable = - storageGroupPartitionTables.get(plan.getStorageGroup()); + DatabasePartitionTable sgPartitionTable = databasePartitionTables.get(plan.getStorageGroup()); return new GetRegionIdResp( new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), sgPartitionTable.getRegionId( @@ -806,8 +773,7 @@ public class PartitionInfo implements SnapshotProcessor { return new GetTimeSlotListResp( new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), new ArrayList<>()); } - DatabasePartitionTable sgPartitionTable = - storageGroupPartitionTables.get(plan.getStorageGroup()); + DatabasePartitionTable sgPartitionTable = databasePartitionTables.get(plan.getStorageGroup()); return new GetTimeSlotListResp( new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), sgPartitionTable.getTimeSlotList( @@ -819,35 +785,34 @@ public class PartitionInfo implements SnapshotProcessor { return new GetSeriesSlotListResp( new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), new ArrayList<>()); } - DatabasePartitionTable sgPartitionTable = - storageGroupPartitionTables.get(plan.getStorageGroup()); + DatabasePartitionTable sgPartitionTable = databasePartitionTables.get(plan.getStorageGroup()); return new GetSeriesSlotListResp( new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), sgPartitionTable.getSeriesSlotList(plan.getPartitionType())); } - public int getStorageGroupPartitionTableSize() { - return storageGroupPartitionTables.size(); - } - public void clear() { nextRegionGroupId.set(-1); - storageGroupPartitionTables.clear(); + databasePartitionTables.clear(); regionMaintainTaskList.clear(); } @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } PartitionInfo that = (PartitionInfo) o; return nextRegionGroupId.get() == that.nextRegionGroupId.get() - && storageGroupPartitionTables.equals(that.storageGroupPartitionTables) + && databasePartitionTables.equals(that.databasePartitionTables) && regionMaintainTaskList.equals(that.regionMaintainTaskList); } @Override public int hashCode() { - return Objects.hash(nextRegionGroupId, storageGroupPartitionTables, regionMaintainTaskList); + return Objects.hash(nextRegionGroupId, databasePartitionTables, regionMaintainTaskList); } } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ClusterSchemaInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ClusterSchemaInfo.java index ccd4b4f9f4..20c5b25f87 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ClusterSchemaInfo.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ClusterSchemaInfo.java @@ -92,7 +92,7 @@ public class ClusterSchemaInfo implements SnapshotProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(ClusterSchemaInfo.class); // StorageGroup read write lock - private final ReentrantReadWriteLock storageGroupReadWriteLock; + private final ReentrantReadWriteLock databaseReadWriteLock; private final ConfigMTree mTree; private static final String SNAPSHOT_FILENAME = "cluster_schema.bin"; @@ -102,7 +102,7 @@ public class ClusterSchemaInfo implements SnapshotProcessor { private final TemplateTable templateTable; public ClusterSchemaInfo() throws IOException { - storageGroupReadWriteLock = new ReentrantReadWriteLock(); + databaseReadWriteLock = new ReentrantReadWriteLock(); try { mTree = new ConfigMTree(); @@ -125,7 +125,7 @@ public class ClusterSchemaInfo implements SnapshotProcessor { */ public TSStatus createDatabase(DatabaseSchemaPlan plan) { TSStatus result = new TSStatus(); - storageGroupReadWriteLock.writeLock().lock(); + databaseReadWriteLock.writeLock().lock(); try { // Set StorageGroup TDatabaseSchema storageGroupSchema = plan.getSchema(); @@ -142,7 +142,7 @@ public class ClusterSchemaInfo implements SnapshotProcessor { LOGGER.error(ERROR_NAME, e); result.setCode(e.getErrorCode()).setMessage(e.getMessage()); } finally { - storageGroupReadWriteLock.writeLock().unlock(); + databaseReadWriteLock.writeLock().unlock(); } return result; } @@ -155,7 +155,7 @@ public class ClusterSchemaInfo implements SnapshotProcessor { */ public TSStatus alterDatabase(DatabaseSchemaPlan plan) { TSStatus result = new TSStatus(); - storageGroupReadWriteLock.writeLock().lock(); + databaseReadWriteLock.writeLock().lock(); try { TDatabaseSchema alterSchema = plan.getSchema(); PartialPath partialPathName = new PartialPath(alterSchema.getName()); @@ -202,7 +202,7 @@ public class ClusterSchemaInfo implements SnapshotProcessor { LOGGER.error(ERROR_NAME, e); result.setCode(e.getErrorCode()).setMessage(e.getMessage()); } finally { - storageGroupReadWriteLock.writeLock().unlock(); + databaseReadWriteLock.writeLock().unlock(); } return result; } @@ -215,7 +215,7 @@ public class ClusterSchemaInfo implements SnapshotProcessor { */ public TSStatus deleteDatabase(DeleteDatabasePlan plan) { TSStatus result = new TSStatus(); - storageGroupReadWriteLock.writeLock().lock(); + databaseReadWriteLock.writeLock().lock(); try { // Delete StorageGroup String storageGroup = plan.getName(); @@ -229,7 +229,7 @@ public class ClusterSchemaInfo implements SnapshotProcessor { .setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()) .setMessage("Database not exist: " + e.getMessage()); } finally { - storageGroupReadWriteLock.writeLock().unlock(); + databaseReadWriteLock.writeLock().unlock(); } return result; } @@ -237,7 +237,7 @@ public class ClusterSchemaInfo implements SnapshotProcessor { /** @return The number of matched StorageGroups by the specific StorageGroup pattern */ public CountDatabaseResp countMatchedDatabases(CountDatabasePlan plan) { CountDatabaseResp result = new CountDatabaseResp(); - storageGroupReadWriteLock.readLock().lock(); + databaseReadWriteLock.readLock().lock(); try { PartialPath patternPath = new PartialPath(plan.getStorageGroupPattern()); result.setCount(mTree.getStorageGroupNum(patternPath, false)); @@ -248,7 +248,7 @@ public class ClusterSchemaInfo implements SnapshotProcessor { new TSStatus(TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()) .setMessage(ERROR_NAME + ": " + e.getMessage())); } finally { - storageGroupReadWriteLock.readLock().unlock(); + databaseReadWriteLock.readLock().unlock(); } return result; } @@ -256,7 +256,7 @@ public class ClusterSchemaInfo implements SnapshotProcessor { /** @return All StorageGroupSchemas that matches to the specific StorageGroup pattern */ public DatabaseSchemaResp getMatchedDatabaseSchemas(GetDatabasePlan plan) { DatabaseSchemaResp result = new DatabaseSchemaResp(); - storageGroupReadWriteLock.readLock().lock(); + databaseReadWriteLock.readLock().lock(); try { Map<String, TDatabaseSchema> schemaMap = new HashMap<>(); PartialPath patternPath = new PartialPath(plan.getStorageGroupPattern()); @@ -274,14 +274,14 @@ public class ClusterSchemaInfo implements SnapshotProcessor { new TSStatus(TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()) .setMessage(ERROR_NAME + ": " + e.getMessage())); } finally { - storageGroupReadWriteLock.readLock().unlock(); + databaseReadWriteLock.readLock().unlock(); } return result; } public TSStatus setTTL(SetTTLPlan plan) { TSStatus result = new TSStatus(); - storageGroupReadWriteLock.writeLock().lock(); + databaseReadWriteLock.writeLock().lock(); try { PartialPath patternPath = new PartialPath(plan.getStorageGroupPathPattern()); List<PartialPath> matchedPaths = mTree.getBelongedStorageGroups(patternPath); @@ -301,14 +301,14 @@ public class ClusterSchemaInfo implements SnapshotProcessor { LOGGER.error(ERROR_NAME, e); result.setCode(TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()).setMessage(ERROR_NAME); } finally { - storageGroupReadWriteLock.writeLock().unlock(); + databaseReadWriteLock.writeLock().unlock(); } return result; } public TSStatus setSchemaReplicationFactor(SetSchemaReplicationFactorPlan plan) { TSStatus result = new TSStatus(); - storageGroupReadWriteLock.writeLock().lock(); + databaseReadWriteLock.writeLock().lock(); try { PartialPath path = new PartialPath(plan.getStorageGroup()); if (mTree.isStorageGroupAlreadySet(path)) { @@ -324,14 +324,14 @@ public class ClusterSchemaInfo implements SnapshotProcessor { LOGGER.error(ERROR_NAME, e); result.setCode(TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()).setMessage(ERROR_NAME); } finally { - storageGroupReadWriteLock.writeLock().unlock(); + databaseReadWriteLock.writeLock().unlock(); } return result; } public TSStatus setDataReplicationFactor(SetDataReplicationFactorPlan plan) { TSStatus result = new TSStatus(); - storageGroupReadWriteLock.writeLock().lock(); + databaseReadWriteLock.writeLock().lock(); try { PartialPath path = new PartialPath(plan.getStorageGroup()); if (mTree.isStorageGroupAlreadySet(path)) { @@ -347,14 +347,14 @@ public class ClusterSchemaInfo implements SnapshotProcessor { LOGGER.error(ERROR_NAME, e); result.setCode(TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()).setMessage(ERROR_NAME); } finally { - storageGroupReadWriteLock.writeLock().unlock(); + databaseReadWriteLock.writeLock().unlock(); } return result; } public TSStatus setTimePartitionInterval(SetTimePartitionIntervalPlan plan) { TSStatus result = new TSStatus(); - storageGroupReadWriteLock.writeLock().lock(); + databaseReadWriteLock.writeLock().lock(); try { PartialPath path = new PartialPath(plan.getStorageGroup()); if (mTree.isStorageGroupAlreadySet(path)) { @@ -370,7 +370,7 @@ public class ClusterSchemaInfo implements SnapshotProcessor { LOGGER.error(ERROR_NAME, e); result.setCode(TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()).setMessage(ERROR_NAME); } finally { - storageGroupReadWriteLock.writeLock().unlock(); + databaseReadWriteLock.writeLock().unlock(); } return result; } @@ -383,7 +383,7 @@ public class ClusterSchemaInfo implements SnapshotProcessor { */ public TSStatus adjustMaxRegionGroupCount(AdjustMaxRegionGroupNumPlan plan) { TSStatus result = new TSStatus(); - storageGroupReadWriteLock.writeLock().lock(); + databaseReadWriteLock.writeLock().lock(); try { for (Map.Entry<String, Pair<Integer, Integer>> entry : plan.getMaxRegionGroupNumMap().entrySet()) { @@ -398,7 +398,7 @@ public class ClusterSchemaInfo implements SnapshotProcessor { LOGGER.error(ERROR_NAME, e); result.setCode(TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()); } finally { - storageGroupReadWriteLock.writeLock().unlock(); + databaseReadWriteLock.writeLock().unlock(); } return result; } @@ -410,20 +410,20 @@ public class ClusterSchemaInfo implements SnapshotProcessor { /** * Only leader use this interface. * - * @return List<StorageGroupName>, all storageGroups' name + * @return List<StorageGroupName>, all Databases' name */ public List<String> getDatabaseNames() { - List<String> storageGroups = new ArrayList<>(); - storageGroupReadWriteLock.readLock().lock(); + List<String> databases = new ArrayList<>(); + databaseReadWriteLock.readLock().lock(); try { List<PartialPath> namePaths = mTree.getAllStorageGroupPaths(); for (PartialPath path : namePaths) { - storageGroups.add(path.getFullPath()); + databases.add(path.getFullPath()); } } finally { - storageGroupReadWriteLock.readLock().unlock(); + databaseReadWriteLock.readLock().unlock(); } - return storageGroups; + return databases; } /** @@ -433,11 +433,11 @@ public class ClusterSchemaInfo implements SnapshotProcessor { * @throws IllegalPathException If the specified Database's name is illegal */ public boolean isDatabaseExisted(String databaseName) throws IllegalPathException { - storageGroupReadWriteLock.readLock().lock(); + databaseReadWriteLock.readLock().lock(); try { return mTree.isStorageGroupAlreadySet(new PartialPath(databaseName)); } finally { - storageGroupReadWriteLock.readLock().unlock(); + databaseReadWriteLock.readLock().unlock(); } } @@ -448,11 +448,11 @@ public class ClusterSchemaInfo implements SnapshotProcessor { * @throws MetadataException If the specific StorageGroup already exists */ public void checkContainsStorageGroup(String storageName) throws MetadataException { - storageGroupReadWriteLock.readLock().lock(); + databaseReadWriteLock.readLock().lock(); try { mTree.checkStorageGroupAlreadySet(new PartialPath(storageName)); } finally { - storageGroupReadWriteLock.readLock().unlock(); + databaseReadWriteLock.readLock().unlock(); } } @@ -465,7 +465,7 @@ public class ClusterSchemaInfo implements SnapshotProcessor { */ public TDatabaseSchema getMatchedDatabaseSchemaByName(String storageGroup) throws DatabaseNotExistsException { - storageGroupReadWriteLock.readLock().lock(); + databaseReadWriteLock.readLock().lock(); try { return mTree .getStorageGroupNodeByStorageGroupPath(new PartialPath(storageGroup)) @@ -473,7 +473,7 @@ public class ClusterSchemaInfo implements SnapshotProcessor { } catch (MetadataException e) { throw new DatabaseNotExistsException(storageGroup); } finally { - storageGroupReadWriteLock.readLock().unlock(); + databaseReadWriteLock.readLock().unlock(); } } @@ -485,7 +485,7 @@ public class ClusterSchemaInfo implements SnapshotProcessor { */ public Map<String, TDatabaseSchema> getMatchedDatabaseSchemasByName(List<String> rawPathList) { Map<String, TDatabaseSchema> schemaMap = new HashMap<>(); - storageGroupReadWriteLock.readLock().lock(); + databaseReadWriteLock.readLock().lock(); try { for (String rawPath : rawPathList) { PartialPath patternPath = new PartialPath(rawPath); @@ -498,7 +498,7 @@ public class ClusterSchemaInfo implements SnapshotProcessor { } catch (MetadataException e) { LOGGER.warn(ERROR_NAME, e); } finally { - storageGroupReadWriteLock.readLock().unlock(); + databaseReadWriteLock.readLock().unlock(); } return schemaMap; } @@ -511,7 +511,7 @@ public class ClusterSchemaInfo implements SnapshotProcessor { * @return maxSchemaRegionGroupNum or maxDataRegionGroupNum */ public int getMinRegionGroupNum(String database, TConsensusGroupType consensusGroupType) { - storageGroupReadWriteLock.readLock().lock(); + databaseReadWriteLock.readLock().lock(); try { PartialPath path = new PartialPath(database); TDatabaseSchema storageGroupSchema = @@ -527,7 +527,7 @@ public class ClusterSchemaInfo implements SnapshotProcessor { LOGGER.warn(ERROR_NAME, e); return -1; } finally { - storageGroupReadWriteLock.readLock().unlock(); + databaseReadWriteLock.readLock().unlock(); } } @@ -539,7 +539,7 @@ public class ClusterSchemaInfo implements SnapshotProcessor { * @return maxSchemaRegionGroupNum or maxDataRegionGroupNum */ public int getMaxRegionGroupNum(String database, TConsensusGroupType consensusGroupType) { - storageGroupReadWriteLock.readLock().lock(); + databaseReadWriteLock.readLock().lock(); try { PartialPath path = new PartialPath(database); TDatabaseSchema storageGroupSchema = @@ -555,7 +555,7 @@ public class ClusterSchemaInfo implements SnapshotProcessor { LOGGER.warn(ERROR_NAME, e); return -1; } finally { - storageGroupReadWriteLock.readLock().unlock(); + databaseReadWriteLock.readLock().unlock(); } } @@ -576,7 +576,7 @@ public class ClusterSchemaInfo implements SnapshotProcessor { File tmpFile = new File(snapshotFile.getAbsolutePath() + "-" + UUID.randomUUID()); - storageGroupReadWriteLock.readLock().lock(); + databaseReadWriteLock.readLock().lock(); try { try (FileOutputStream fileOutputStream = new FileOutputStream(tmpFile); BufferedOutputStream outputStream = new BufferedOutputStream(fileOutputStream)) { @@ -595,7 +595,7 @@ public class ClusterSchemaInfo implements SnapshotProcessor { "Can't delete temporary snapshot file: {}, retrying...", tmpFile.getAbsolutePath()); } } - storageGroupReadWriteLock.readLock().unlock(); + databaseReadWriteLock.readLock().unlock(); } } @@ -613,14 +613,14 @@ public class ClusterSchemaInfo implements SnapshotProcessor { snapshotFile.getAbsolutePath()); return; } - storageGroupReadWriteLock.writeLock().lock(); + databaseReadWriteLock.writeLock().lock(); try (FileInputStream fileInputStream = new FileInputStream(snapshotFile); BufferedInputStream bufferedInputStream = new BufferedInputStream(fileInputStream)) { // Load snapshot of MTree mTree.clear(); mTree.deserialize(bufferedInputStream); } finally { - storageGroupReadWriteLock.writeLock().unlock(); + databaseReadWriteLock.writeLock().unlock(); } } @@ -628,13 +628,13 @@ public class ClusterSchemaInfo implements SnapshotProcessor { PartialPath partialPath, int level) { Pair<List<PartialPath>, Set<PartialPath>> matchedPathsInNextLevel = new Pair(new HashSet<>(), new HashSet<>()); - storageGroupReadWriteLock.readLock().lock(); + databaseReadWriteLock.readLock().lock(); try { matchedPathsInNextLevel = mTree.getNodesListInGivenLevel(partialPath, level, true); } catch (MetadataException e) { LOGGER.error("Error get matched paths in given level.", e); } finally { - storageGroupReadWriteLock.readLock().unlock(); + databaseReadWriteLock.readLock().unlock(); } return matchedPathsInNextLevel; } @@ -643,13 +643,13 @@ public class ClusterSchemaInfo implements SnapshotProcessor { PartialPath partialPath) { Pair<Set<TSchemaNode>, Set<PartialPath>> matchedPathsInNextLevel = new Pair<>(new HashSet<>(), new HashSet<>()); - storageGroupReadWriteLock.readLock().lock(); + databaseReadWriteLock.readLock().lock(); try { matchedPathsInNextLevel = mTree.getChildNodePathInNextLevel(partialPath); } catch (MetadataException e) { LOGGER.error("Error get matched paths in next level.", e); } finally { - storageGroupReadWriteLock.readLock().unlock(); + databaseReadWriteLock.readLock().unlock(); } return matchedPathsInNextLevel; } @@ -872,7 +872,7 @@ public class ClusterSchemaInfo implements SnapshotProcessor { public Map<String, TDatabaseSchema> getMatchedStorageGroupSchemasByOneName( String[] storageGroupPathPattern) { Map<String, TDatabaseSchema> schemaMap = new HashMap<>(); - storageGroupReadWriteLock.readLock().lock(); + databaseReadWriteLock.readLock().lock(); try { PartialPath patternPath = new PartialPath(storageGroupPathPattern); List<PartialPath> matchedPaths = mTree.getBelongedStorageGroups(patternPath); @@ -883,7 +883,7 @@ public class ClusterSchemaInfo implements SnapshotProcessor { } catch (MetadataException e) { LOGGER.warn(ERROR_NAME, e); } finally { - storageGroupReadWriteLock.readLock().unlock(); + databaseReadWriteLock.readLock().unlock(); } return schemaMap; } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java index e01bb7ebf5..ecb059d58e 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java @@ -45,7 +45,7 @@ import org.apache.iotdb.confignode.exception.AddPeerException; import org.apache.iotdb.confignode.exception.DatabaseNotExistsException; import org.apache.iotdb.confignode.manager.ClusterSchemaManager; import org.apache.iotdb.confignode.manager.ConfigManager; -import org.apache.iotdb.confignode.manager.ConsensusManager; +import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; import org.apache.iotdb.confignode.manager.load.LoadManager; import org.apache.iotdb.confignode.manager.node.NodeManager; import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java index ef27ed492f..0d209a5f9e 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java @@ -23,7 +23,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProcedurePlan; import org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan; import org.apache.iotdb.confignode.manager.ConfigManager; -import org.apache.iotdb.confignode.manager.ConsensusManager; +import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; import org.apache.iotdb.confignode.persistence.ProcedureInfo; import org.apache.iotdb.confignode.procedure.Procedure; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java index 4310c267ee..1ecd98db8c 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java @@ -204,6 +204,21 @@ public class ConfigNode implements ConfigNodeMBean { } } + private void setUpInternalServices() throws StartupException, IOException { + // Setup JMXService + registerManager.register(new JMXService()); + JMXService.registerMBean(this, mbeanName); + + registerManager.register(MetricService.getInstance()); + // bind predefined metric sets + MetricService.getInstance().addMetricSet(new JvmMetrics()); + MetricService.getInstance().addMetricSet(new LogbackMetrics()); + MetricService.getInstance().addMetricSet(new ProcessMetrics()); + MetricService.getInstance().addMetricSet(new SystemMetrics(false)); + + LOGGER.info("Successfully setup internal services."); + } + private void initConfigManager() { try { configManager = new ConfigManager(); @@ -220,21 +235,6 @@ public class ConfigNode implements ConfigNodeMBean { LOGGER.info("Successfully initialize ConfigManager."); } - private void setUpInternalServices() throws StartupException, IOException { - // Setup JMXService - registerManager.register(new JMXService()); - JMXService.registerMBean(this, mbeanName); - - registerManager.register(MetricService.getInstance()); - // bind predefined metric sets - MetricService.getInstance().addMetricSet(new JvmMetrics()); - MetricService.getInstance().addMetricSet(new LogbackMetrics()); - MetricService.getInstance().addMetricSet(new ProcessMetrics()); - MetricService.getInstance().addMetricSet(new SystemMetrics(false)); - - LOGGER.info("Successfully setup internal services."); - } - /** Register Non-seed ConfigNode when first startup */ private void sendRegisterConfigNodeRequest() throws StartupException, IOException { TConfigNodeRegisterReq req = diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceHandlerMetrics.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceHandlerMetrics.java index 86f48bd6ab..7c3483d72a 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceHandlerMetrics.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceHandlerMetrics.java @@ -28,7 +28,7 @@ import java.util.Objects; import java.util.concurrent.atomic.AtomicLong; public class ConfigNodeRPCServiceHandlerMetrics implements IMetricSet { - private AtomicLong thriftConnectionNumber; + private final AtomicLong thriftConnectionNumber; public ConfigNodeRPCServiceHandlerMetrics(AtomicLong thriftConnectionNumber) { this.thriftConnectionNumber = thriftConnectionNumber; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceMetrics.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceMetrics.java index 4b06390cee..e237dd75b0 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceMetrics.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceMetrics.java @@ -29,7 +29,8 @@ import org.apache.iotdb.metrics.utils.MetricType; import java.util.Objects; public class ConfigNodeRPCServiceMetrics implements IMetricSet { - private AbstractThriftServiceThread thriftServiceThread; + + private final AbstractThriftServiceThread thriftServiceThread; public ConfigNodeRPCServiceMetrics(AbstractThriftServiceThread thriftServiceThread) { this.thriftServiceThread = thriftServiceThread; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java index 29798f63df..b67daaf6e9 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java @@ -63,7 +63,7 @@ import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeRegisterR import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp; import org.apache.iotdb.confignode.consensus.response.partition.RegionInfoListResp; import org.apache.iotdb.confignode.manager.ConfigManager; -import org.apache.iotdb.confignode.manager.ConsensusManager; +import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService; import org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq; import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq; diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/cluster/NodeStatus.java b/node-commons/src/main/java/org/apache/iotdb/commons/cluster/NodeStatus.java index d19eadfbdd..32f083dd04 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/cluster/NodeStatus.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/cluster/NodeStatus.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.commons.cluster; /** Node status for showing cluster */ diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/cluster/NodeType.java b/node-commons/src/main/java/org/apache/iotdb/commons/cluster/NodeType.java index 38f521f837..ff3fbfdada 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/cluster/NodeType.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/cluster/NodeType.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.commons.cluster; public enum NodeType { @@ -34,7 +35,7 @@ public enum NodeType { public static NodeType parse(String type) { for (NodeType nodeType : NodeType.values()) { - if (nodeType.nodeType.equals(type)) { + if (type.equals(nodeType.getNodeType())) { return nodeType; } } diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/cluster/RegionRoleType.java b/node-commons/src/main/java/org/apache/iotdb/commons/cluster/RegionRoleType.java index 6e65824f7d..95ee99f23b 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/cluster/RegionRoleType.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/cluster/RegionRoleType.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.commons.cluster; /** Region Role for showing regions */ diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java b/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java index e434f3d14c..63f6c06773 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java @@ -32,8 +32,6 @@ public enum Metric { DATA_WRITTEN, DATA_READ, COMPACTION_TASK_COUNT, - CLUSTER_NODE_STATUS, - CLUSTER_NODE_LEADER_COUNT, PROCESS_CPU_LOAD, PROCESS_CPU_TIME, PROCESS_MAX_MEM, @@ -52,11 +50,15 @@ public enum Metric { SYS_COMMITTED_VM_SIZE, SYS_DISK_TOTAL_SPACE, SYS_DISK_FREE_SPACE, - CONFIG_NODE, - DATA_NODE, - STORAGE_GROUP, - REGION, - SLOT, + + NODE_NUM, + DATABASE_NUM, + REGION_NUM, + REGION_NUM_IN_DATA_NODE, + REGION_GROUP_LEADER_NUM_IN_DATA_NODE, + SERIES_SLOT_NUM_IN_DATABASE, + REGION_GROUP_NUM_IN_DATABASE, + THRIFT_CONNECTIONS, THRIFT_ACTIVE_THREADS, IOT_CONSENSUS,
