This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 92efe44edc [IOTDB-3289][IOTDB-3291][IOTDB-3217] Limit the number of
regions (#6087)
92efe44edc is described below
commit 92efe44edc88e9d5f50420787e70c675df97a66a
Author: YongzaoDan <[email protected]>
AuthorDate: Wed Jun 1 22:41:54 2022 +0800
[IOTDB-3289][IOTDB-3291][IOTDB-3217] Limit the number of regions (#6087)
---
.../resources/conf/iotdb-confignode.properties | 10 -----
.../client/handlers/HeartbeatHandler.java | 19 ++++++---
.../iotdb/confignode/conf/ConfigNodeConf.java | 22 -----------
.../confignode/conf/ConfigNodeDescriptor.java | 11 ------
.../iotdb/confignode/manager/NodeManager.java | 18 +++++++++
.../iotdb/confignode/manager/PartitionManager.java | 26 +++++++++++-
.../iotdb/confignode/manager/load/LoadManager.java | 46 ++++++++++++++++------
.../iotdb/confignode/persistence/NodeInfo.java | 20 +++++++---
.../thrift/ConfigNodeRPCServiceProcessor.java | 14 +++----
9 files changed, 110 insertions(+), 76 deletions(-)
diff --git a/confignode/src/assembly/resources/conf/iotdb-confignode.properties
b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
index 2d8e8e04fd..49dbe85247 100644
--- a/confignode/src/assembly/resources/conf/iotdb-confignode.properties
+++ b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
@@ -125,16 +125,6 @@ target_confignode=0.0.0.0:22277
# data_replication_factor=3
-# The maximum number of SchemaRegions of each StorageGroup
-# Datatype: int
-# maximum_schema_region_count=4
-
-
-# The maximum number of DataRegions of each StorageGroup
-# Datatype: int
-# maximum_data_region_count=20
-
-
####################
### thrift rpc configuration
####################
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/HeartbeatHandler.java
b/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/HeartbeatHandler.java
index 84b8f2ec71..a1b42599c4 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/HeartbeatHandler.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/HeartbeatHandler.java
@@ -18,32 +18,41 @@
*/
package org.apache.iotdb.confignode.client.handlers;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.THeartbeatResp;
import org.apache.iotdb.confignode.manager.load.heartbeat.HeartbeatCache;
import org.apache.iotdb.confignode.manager.load.heartbeat.HeartbeatPackage;
import org.apache.thrift.async.AsyncMethodCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class HeartbeatHandler implements AsyncMethodCallback<THeartbeatResp> {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(HeartbeatHandler.class);
+
// Update HeartbeatCache when success
- private final int dataNodeId;
+ private final TDataNodeLocation dataNodeLocation;
private final HeartbeatCache heartbeatCache;
- public HeartbeatHandler(int dataNodeId, HeartbeatCache heartbeatCache) {
- this.dataNodeId = dataNodeId;
+ public HeartbeatHandler(TDataNodeLocation dataNodeLocation, HeartbeatCache
heartbeatCache) {
+ this.dataNodeLocation = dataNodeLocation;
this.heartbeatCache = heartbeatCache;
}
@Override
public void onComplete(THeartbeatResp tHeartbeatResp) {
heartbeatCache.cacheHeartBeat(
- dataNodeId,
+ dataNodeLocation.getDataNodeId(),
new HeartbeatPackage(tHeartbeatResp.getHeartbeatTimestamp(),
System.currentTimeMillis()));
}
@Override
public void onError(Exception e) {
- // Just ignore heartbeat error
+ LOGGER.warn(
+ "Heartbeat error on DataNode: {id={}, internalEndPoint={}}",
+ dataNodeLocation.getDataNodeId(),
+ dataNodeLocation.getInternalEndPoint(),
+ e);
}
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
index 6ec526de45..42bfdb30a8 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
@@ -126,12 +126,6 @@ public class ConfigNodeConf {
/** Default number of DataRegion replicas */
private int dataReplicationFactor = 3;
- /** The maximum number of SchemaRegions of each StorageGroup */
- private int maximumSchemaRegionCount = 4;
-
- /** The maximum number of DataRegions of each StorageGroup */
- private int maximumDataRegionCount = 20;
-
/** Procedure Evict ttl */
private int procedureCompletedEvictTTL = 800;
@@ -393,22 +387,6 @@ public class ConfigNodeConf {
this.dataReplicationFactor = dataReplicationFactor;
}
- public int getMaximumSchemaRegionCount() {
- return maximumSchemaRegionCount;
- }
-
- public void setMaximumSchemaRegionCount(int maximumSchemaRegionCount) {
- this.maximumSchemaRegionCount = maximumSchemaRegionCount;
- }
-
- public int getMaximumDataRegionCount() {
- return maximumDataRegionCount;
- }
-
- public void setMaximumDataRegionCount(int maximumDataRegionCount) {
- this.maximumDataRegionCount = maximumDataRegionCount;
- }
-
public int getProcedureCompletedEvictTTL() {
return procedureCompletedEvictTTL;
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 4b7c145a0a..36ea3f2008 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -200,17 +200,6 @@ public class ConfigNodeDescriptor {
properties.getProperty(
"data_replication_factor",
String.valueOf(conf.getDataReplicationFactor()))));
- conf.setMaximumSchemaRegionCount(
- Integer.parseInt(
- properties.getProperty(
- "maximum_schema_region_count",
- String.valueOf(conf.getMaximumSchemaRegionCount()))));
-
- conf.setMaximumDataRegionCount(
- Integer.parseInt(
- properties.getProperty(
- "maximum_data_region_count",
String.valueOf(conf.getMaximumDataRegionCount()))));
-
conf.setHeartbeatInterval(
Long.parseLong(
properties.getProperty(
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
index af436f2fa1..e4cfb0c44e 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
@@ -117,6 +117,24 @@ public class NodeManager {
return (DataNodeInfosResp) getConsensusManager().read(req).getDataset();
}
+ /**
+ * Only leader use this interface
+ *
+ * @return The number of online DataNodes
+ */
+ public int getOnlineDataNodeCount() {
+ return nodeInfo.getOnlineDataNodeCount();
+ }
+
+ /**
+ * Only leader use this interface
+ *
+ * @return The number of total cpu cores in online DataNodes
+ */
+ public int getTotalCpuCoreCount() {
+ return nodeInfo.getTotalCpuCoreCount();
+ }
+
/**
* Only leader use this interface
*
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
index d672f68cd4..c97c4b66bf 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
@@ -312,7 +312,27 @@ public class PartitionManager {
storageGroupWithoutRegion.add(storageGroup);
}
}
- getLoadManager().initializeRegions(storageGroupWithoutRegion,
consensusGroupType);
+
+ // Calculate the number of Regions
+ // TODO: This is a temporary code, delete it later
+ int regionNum;
+ if (consensusGroupType == TConsensusGroupType.SchemaRegion) {
+ regionNum =
+ Math.max(
+ 1,
+ getNodeManager().getOnlineDataNodeCount()
+ /
ConfigNodeDescriptor.getInstance().getConf().getSchemaReplicationFactor());
+ } else {
+ regionNum =
+ Math.max(
+ 2,
+ (int)
+ (getNodeManager().getTotalCpuCoreCount()
+ * 0.3
+ /
ConfigNodeDescriptor.getInstance().getConf().getDataReplicationFactor()));
+ }
+
+ getLoadManager().initializeRegions(storageGroupWithoutRegion,
consensusGroupType, regionNum);
}
/** Get all allocated RegionReplicaSets */
@@ -400,6 +420,10 @@ public class PartitionManager {
return configManager.getConsensusManager();
}
+ private NodeManager getNodeManager() {
+ return configManager.getNodeManager();
+ }
+
private ClusterSchemaManager getClusterSchemaManager() {
return configManager.getClusterSchemaManager();
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 55654b1437..daa37f7a24 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -82,14 +82,16 @@ public class LoadManager implements Runnable {
*
* @param storageGroups List<StorageGroupName>
* @param consensusGroupType TConsensusGroupType of Region to be allocated
+ * @param regionNum The number of Regions
*/
- public void initializeRegions(List<String> storageGroups,
TConsensusGroupType consensusGroupType)
+ public void initializeRegions(
+ List<String> storageGroups, TConsensusGroupType consensusGroupType, int
regionNum)
throws NotEnoughDataNodeException {
CreateRegionsReq createRegionsReq = null;
try {
createRegionsReq =
- regionBalancer.genRegionsAllocationPlan(storageGroups,
consensusGroupType, 1);
+ regionBalancer.genRegionsAllocationPlan(storageGroups,
consensusGroupType, regionNum);
createRegionsOnDataNodes(createRegionsReq);
} catch (MetadataException e) {
LOGGER.error("Meet error when create Regions", e);
@@ -136,31 +138,48 @@ public class LoadManager implements Runnable {
if (totalReplicaNum < totalCoreNum * 0.5) {
// Allocate more Regions
- CreateRegionsReq createRegionsReq;
+ CreateRegionsReq createRegionsReq = null;
+
+ // Assume that cluster will get the best efficiency when
SchemaRegion:DataRegion is 1:5
+ // TODO: Find an optimal SchemaRegion:DataRegion rate.
if (storageGroupSchema.getSchemaRegionGroupIdsSize() * 5
- > storageGroupSchema.getDataRegionGroupIdsSize()) {
- // TODO: Find an optimal SchemaRegion:DataRegion rate
- // Currently, we just assume that it's 1:5
+ > storageGroupSchema.getDataRegionGroupIdsSize()
+ && storageGroupSchema.getDataRegionGroupIdsSize()
+ < storageGroupSchema.getMaximumDataRegionCount()) {
+ // Allocate more DataRegions
+
+ // regionNum equals to min(remain cpu core,
+ // min(SchemaRegionCnt * 5 - DataRegionCnt, MaxDataRegionCnt -
DataRegionCnt))
int regionNum =
Math.min(
((int) (totalCoreNum * 0.5) - totalReplicaNum)
/ storageGroupSchema.getDataReplicationFactor(),
- storageGroupSchema.getSchemaRegionGroupIdsSize() * 5
- - storageGroupSchema.getDataRegionGroupIdsSize());
+ Math.min(
+ storageGroupSchema.getSchemaRegionGroupIdsSize() * 5
+ - storageGroupSchema.getDataRegionGroupIdsSize(),
+ storageGroupSchema.getMaximumDataRegionCount()
+ - storageGroupSchema.getDataRegionGroupIdsSize()));
+
createRegionsReq =
regionBalancer.genRegionsAllocationPlan(
Collections.singletonList(storageGroup),
TConsensusGroupType.DataRegion,
regionNum);
- } else {
+ } else if (storageGroupSchema.getSchemaRegionGroupIdsSize() * 5
+ <= storageGroupSchema.getDataRegionGroupIdsSize()
+ && storageGroupSchema.getSchemaRegionGroupIdsSize()
+ < storageGroupSchema.getMaximumSchemaRegionCount()) {
+ // Allocate one more SchemaRegion
createRegionsReq =
regionBalancer.genRegionsAllocationPlan(
Collections.singletonList(storageGroup),
TConsensusGroupType.SchemaRegion, 1);
}
// TODO: use procedure to protect this
- createRegionsOnDataNodes(createRegionsReq);
- getConsensusManager().write(createRegionsReq);
+ if (createRegionsReq != null) {
+ createRegionsOnDataNodes(createRegionsReq);
+ getConsensusManager().write(createRegionsReq);
+ }
}
} catch (MetadataException e) {
LOGGER.warn("Meet error when doing regionExpansion", e);
@@ -186,7 +205,7 @@ public class LoadManager implements Runnable {
List<TDataNodeInfo> onlineDataNodes =
getNodeManager().getOnlineDataNodes(-1);
for (TDataNodeInfo dataNodeInfo : onlineDataNodes) {
HeartbeatHandler handler =
- new
HeartbeatHandler(dataNodeInfo.getLocation().getDataNodeId(), heartbeatCache);
+ new HeartbeatHandler(dataNodeInfo.getLocation(),
heartbeatCache);
AsyncDataNodeClientPool.getInstance()
.getHeartBeat(
dataNodeInfo.getLocation().getInternalEndPoint(),
genHeartbeatReq(), handler);
@@ -195,7 +214,8 @@ public class LoadManager implements Runnable {
balanceCount += 1;
// TODO: Adjust load balancing period
if (balanceCount == 10) {
- doLoadBalancing();
+ // Pause load balancing temporary
+ // doLoadBalancing();
balanceCount = 0;
}
} else {
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
index 0cd49efaea..69dfc67233 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
@@ -151,11 +151,6 @@ public class NodeInfo implements SnapshotProcessor {
} else if (nextDataNodeId.get() == minimumDataNode) {
result.setMessage("IoTDB-Cluster could provide data service, now enjoy
yourself!");
}
-
- LOGGER.info(
- "Successfully register DataNode: {}. Current online DataNodes: {}",
- info.getLocation(),
- onlineDataNodes);
} finally {
dataNodeInfoReadWriteLock.writeLock().unlock();
}
@@ -189,6 +184,7 @@ public class NodeInfo implements SnapshotProcessor {
return result;
}
+ /** Return the number of online DataNodes */
public int getOnlineDataNodeCount() {
int result;
dataNodeInfoReadWriteLock.readLock().lock();
@@ -200,6 +196,20 @@ public class NodeInfo implements SnapshotProcessor {
return result;
}
+ /** Return the number of total cpu cores in online DataNodes */
+ public int getTotalCpuCoreCount() {
+ int result = 0;
+ dataNodeInfoReadWriteLock.readLock().lock();
+ try {
+ for (TDataNodeInfo info : onlineDataNodes.values()) {
+ result += info.getCpuCoreNum();
+ }
+ } finally {
+ dataNodeInfoReadWriteLock.readLock().unlock();
+ }
+ return result;
+ }
+
/**
* Return the specific online DataNode
*
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 4a6a8cb6b2..5a457a3da9 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -161,7 +161,7 @@ public class ConfigNodeRPCServiceProcessor implements
ConfigIService.Iface {
public TSStatus setStorageGroup(TSetStorageGroupReq req) throws TException {
TStorageGroupSchema storageGroupSchema = req.getStorageGroup();
- // Set default configurations
+ // Set default configurations if necessary
if (!storageGroupSchema.isSetTTL()) {
storageGroupSchema.setTTL(CommonDescriptor.getInstance().getConfig().getDefaultTTL());
}
@@ -177,14 +177,10 @@ public class ConfigNodeRPCServiceProcessor implements
ConfigIService.Iface {
storageGroupSchema.setTimePartitionInterval(
ConfigNodeDescriptor.getInstance().getConf().getTimePartitionInterval());
}
- if (!storageGroupSchema.isSetMaximumSchemaRegionCount()) {
- storageGroupSchema.setMaximumSchemaRegionCount(
-
ConfigNodeDescriptor.getInstance().getConf().getMaximumSchemaRegionCount());
- }
- if (!storageGroupSchema.isSetMaximumDataRegionCount()) {
- storageGroupSchema.setMaximumDataRegionCount(
-
ConfigNodeDescriptor.getInstance().getConf().getMaximumDataRegionCount());
- }
+
+ // Mark the StorageGroup as SchemaRegions and DataRegions not yet created
+ storageGroupSchema.setMaximumSchemaRegionCount(-1);
+ storageGroupSchema.setMaximumDataRegionCount(-1);
// Initialize RegionGroupId List
storageGroupSchema.setSchemaRegionGroupIds(new ArrayList<>());