This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 92efe44edc [IOTDB-3289][IOTDB-3291][IOTDB-3217] Limit the number of 
regions (#6087)
92efe44edc is described below

commit 92efe44edc88e9d5f50420787e70c675df97a66a
Author: YongzaoDan <[email protected]>
AuthorDate: Wed Jun 1 22:41:54 2022 +0800

    [IOTDB-3289][IOTDB-3291][IOTDB-3217] Limit the number of regions (#6087)
---
 .../resources/conf/iotdb-confignode.properties     | 10 -----
 .../client/handlers/HeartbeatHandler.java          | 19 ++++++---
 .../iotdb/confignode/conf/ConfigNodeConf.java      | 22 -----------
 .../confignode/conf/ConfigNodeDescriptor.java      | 11 ------
 .../iotdb/confignode/manager/NodeManager.java      | 18 +++++++++
 .../iotdb/confignode/manager/PartitionManager.java | 26 +++++++++++-
 .../iotdb/confignode/manager/load/LoadManager.java | 46 ++++++++++++++++------
 .../iotdb/confignode/persistence/NodeInfo.java     | 20 +++++++---
 .../thrift/ConfigNodeRPCServiceProcessor.java      | 14 +++----
 9 files changed, 110 insertions(+), 76 deletions(-)

diff --git a/confignode/src/assembly/resources/conf/iotdb-confignode.properties 
b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
index 2d8e8e04fd..49dbe85247 100644
--- a/confignode/src/assembly/resources/conf/iotdb-confignode.properties
+++ b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
@@ -125,16 +125,6 @@ target_confignode=0.0.0.0:22277
 # data_replication_factor=3
 
 
-# The maximum number of SchemaRegions of each StorageGroup
-# Datatype: int
-# maximum_schema_region_count=4
-
-
-# The maximum number of DataRegions of each StorageGroup
-# Datatype: int
-# maximum_data_region_count=20
-
-
 ####################
 ### thrift rpc configuration
 ####################
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/HeartbeatHandler.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/HeartbeatHandler.java
index 84b8f2ec71..a1b42599c4 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/HeartbeatHandler.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/HeartbeatHandler.java
@@ -18,32 +18,41 @@
  */
 package org.apache.iotdb.confignode.client.handlers;
 
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.THeartbeatResp;
 import org.apache.iotdb.confignode.manager.load.heartbeat.HeartbeatCache;
 import org.apache.iotdb.confignode.manager.load.heartbeat.HeartbeatPackage;
 
 import org.apache.thrift.async.AsyncMethodCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class HeartbeatHandler implements AsyncMethodCallback<THeartbeatResp> {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(HeartbeatHandler.class);
+
   // Update HeartbeatCache when success
-  private final int dataNodeId;
+  private final TDataNodeLocation dataNodeLocation;
   private final HeartbeatCache heartbeatCache;
 
-  public HeartbeatHandler(int dataNodeId, HeartbeatCache heartbeatCache) {
-    this.dataNodeId = dataNodeId;
+  public HeartbeatHandler(TDataNodeLocation dataNodeLocation, HeartbeatCache 
heartbeatCache) {
+    this.dataNodeLocation = dataNodeLocation;
     this.heartbeatCache = heartbeatCache;
   }
 
   @Override
   public void onComplete(THeartbeatResp tHeartbeatResp) {
     heartbeatCache.cacheHeartBeat(
-        dataNodeId,
+        dataNodeLocation.getDataNodeId(),
         new HeartbeatPackage(tHeartbeatResp.getHeartbeatTimestamp(), 
System.currentTimeMillis()));
   }
 
   @Override
   public void onError(Exception e) {
-    // Just ignore heartbeat error
+    LOGGER.warn(
+        "Heartbeat error on DataNode: {id={}, internalEndPoint={}}",
+        dataNodeLocation.getDataNodeId(),
+        dataNodeLocation.getInternalEndPoint(),
+        e);
   }
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java 
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
index 6ec526de45..42bfdb30a8 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
@@ -126,12 +126,6 @@ public class ConfigNodeConf {
   /** Default number of DataRegion replicas */
   private int dataReplicationFactor = 3;
 
-  /** The maximum number of SchemaRegions of each StorageGroup */
-  private int maximumSchemaRegionCount = 4;
-
-  /** The maximum number of DataRegions of each StorageGroup */
-  private int maximumDataRegionCount = 20;
-
   /** Procedure Evict ttl */
   private int procedureCompletedEvictTTL = 800;
 
@@ -393,22 +387,6 @@ public class ConfigNodeConf {
     this.dataReplicationFactor = dataReplicationFactor;
   }
 
-  public int getMaximumSchemaRegionCount() {
-    return maximumSchemaRegionCount;
-  }
-
-  public void setMaximumSchemaRegionCount(int maximumSchemaRegionCount) {
-    this.maximumSchemaRegionCount = maximumSchemaRegionCount;
-  }
-
-  public int getMaximumDataRegionCount() {
-    return maximumDataRegionCount;
-  }
-
-  public void setMaximumDataRegionCount(int maximumDataRegionCount) {
-    this.maximumDataRegionCount = maximumDataRegionCount;
-  }
-
   public int getProcedureCompletedEvictTTL() {
     return procedureCompletedEvictTTL;
   }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 4b7c145a0a..36ea3f2008 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -200,17 +200,6 @@ public class ConfigNodeDescriptor {
               properties.getProperty(
                   "data_replication_factor", 
String.valueOf(conf.getDataReplicationFactor()))));
 
-      conf.setMaximumSchemaRegionCount(
-          Integer.parseInt(
-              properties.getProperty(
-                  "maximum_schema_region_count",
-                  String.valueOf(conf.getMaximumSchemaRegionCount()))));
-
-      conf.setMaximumDataRegionCount(
-          Integer.parseInt(
-              properties.getProperty(
-                  "maximum_data_region_count", 
String.valueOf(conf.getMaximumDataRegionCount()))));
-
       conf.setHeartbeatInterval(
           Long.parseLong(
               properties.getProperty(
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
index af436f2fa1..e4cfb0c44e 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
@@ -117,6 +117,24 @@ public class NodeManager {
     return (DataNodeInfosResp) getConsensusManager().read(req).getDataset();
   }
 
+  /**
+   * Only leader use this interface
+   *
+   * @return The number of online DataNodes
+   */
+  public int getOnlineDataNodeCount() {
+    return nodeInfo.getOnlineDataNodeCount();
+  }
+
+  /**
+   * Only leader use this interface
+   *
+   * @return The number of total cpu cores in online DataNodes
+   */
+  public int getTotalCpuCoreCount() {
+    return nodeInfo.getTotalCpuCoreCount();
+  }
+
   /**
    * Only leader use this interface
    *
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
index d672f68cd4..c97c4b66bf 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
@@ -312,7 +312,27 @@ public class PartitionManager {
         storageGroupWithoutRegion.add(storageGroup);
       }
     }
-    getLoadManager().initializeRegions(storageGroupWithoutRegion, 
consensusGroupType);
+
+    // Calculate the number of Regions
+    // TODO: This is a temporary code, delete it later
+    int regionNum;
+    if (consensusGroupType == TConsensusGroupType.SchemaRegion) {
+      regionNum =
+          Math.max(
+              1,
+              getNodeManager().getOnlineDataNodeCount()
+                  / 
ConfigNodeDescriptor.getInstance().getConf().getSchemaReplicationFactor());
+    } else {
+      regionNum =
+          Math.max(
+              2,
+              (int)
+                  (getNodeManager().getTotalCpuCoreCount()
+                      * 0.3
+                      / 
ConfigNodeDescriptor.getInstance().getConf().getDataReplicationFactor()));
+    }
+
+    getLoadManager().initializeRegions(storageGroupWithoutRegion, 
consensusGroupType, regionNum);
   }
 
   /** Get all allocated RegionReplicaSets */
@@ -400,6 +420,10 @@ public class PartitionManager {
     return configManager.getConsensusManager();
   }
 
+  private NodeManager getNodeManager() {
+    return configManager.getNodeManager();
+  }
+
   private ClusterSchemaManager getClusterSchemaManager() {
     return configManager.getClusterSchemaManager();
   }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 55654b1437..daa37f7a24 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -82,14 +82,16 @@ public class LoadManager implements Runnable {
    *
    * @param storageGroups List<StorageGroupName>
    * @param consensusGroupType TConsensusGroupType of Region to be allocated
+   * @param regionNum The number of Regions
    */
-  public void initializeRegions(List<String> storageGroups, 
TConsensusGroupType consensusGroupType)
+  public void initializeRegions(
+      List<String> storageGroups, TConsensusGroupType consensusGroupType, int 
regionNum)
       throws NotEnoughDataNodeException {
     CreateRegionsReq createRegionsReq = null;
 
     try {
       createRegionsReq =
-          regionBalancer.genRegionsAllocationPlan(storageGroups, 
consensusGroupType, 1);
+          regionBalancer.genRegionsAllocationPlan(storageGroups, 
consensusGroupType, regionNum);
       createRegionsOnDataNodes(createRegionsReq);
     } catch (MetadataException e) {
       LOGGER.error("Meet error when create Regions", e);
@@ -136,31 +138,48 @@ public class LoadManager implements Runnable {
 
         if (totalReplicaNum < totalCoreNum * 0.5) {
           // Allocate more Regions
-          CreateRegionsReq createRegionsReq;
+          CreateRegionsReq createRegionsReq = null;
+
+          // Assume that cluster will get the best efficiency when 
SchemaRegion:DataRegion is 1:5
+          // TODO: Find an optimal SchemaRegion:DataRegion rate.
           if (storageGroupSchema.getSchemaRegionGroupIdsSize() * 5
-              > storageGroupSchema.getDataRegionGroupIdsSize()) {
-            // TODO: Find an optimal SchemaRegion:DataRegion rate
-            // Currently, we just assume that it's 1:5
+                  > storageGroupSchema.getDataRegionGroupIdsSize()
+              && storageGroupSchema.getDataRegionGroupIdsSize()
+                  < storageGroupSchema.getMaximumDataRegionCount()) {
+            // Allocate more DataRegions
+
+            // regionNum equals to min(remain cpu core,
+            // min(SchemaRegionCnt * 5 - DataRegionCnt, MaxDataRegionCnt - 
DataRegionCnt))
             int regionNum =
                 Math.min(
                     ((int) (totalCoreNum * 0.5) - totalReplicaNum)
                         / storageGroupSchema.getDataReplicationFactor(),
-                    storageGroupSchema.getSchemaRegionGroupIdsSize() * 5
-                        - storageGroupSchema.getDataRegionGroupIdsSize());
+                    Math.min(
+                        storageGroupSchema.getSchemaRegionGroupIdsSize() * 5
+                            - storageGroupSchema.getDataRegionGroupIdsSize(),
+                        storageGroupSchema.getMaximumDataRegionCount()
+                            - storageGroupSchema.getDataRegionGroupIdsSize()));
+
             createRegionsReq =
                 regionBalancer.genRegionsAllocationPlan(
                     Collections.singletonList(storageGroup),
                     TConsensusGroupType.DataRegion,
                     regionNum);
-          } else {
+          } else if (storageGroupSchema.getSchemaRegionGroupIdsSize() * 5
+                  <= storageGroupSchema.getDataRegionGroupIdsSize()
+              && storageGroupSchema.getSchemaRegionGroupIdsSize()
+                  < storageGroupSchema.getMaximumSchemaRegionCount()) {
+            // Allocate one more SchemaRegion
             createRegionsReq =
                 regionBalancer.genRegionsAllocationPlan(
                     Collections.singletonList(storageGroup), 
TConsensusGroupType.SchemaRegion, 1);
           }
 
           // TODO: use procedure to protect this
-          createRegionsOnDataNodes(createRegionsReq);
-          getConsensusManager().write(createRegionsReq);
+          if (createRegionsReq != null) {
+            createRegionsOnDataNodes(createRegionsReq);
+            getConsensusManager().write(createRegionsReq);
+          }
         }
       } catch (MetadataException e) {
         LOGGER.warn("Meet error when doing regionExpansion", e);
@@ -186,7 +205,7 @@ public class LoadManager implements Runnable {
           List<TDataNodeInfo> onlineDataNodes = 
getNodeManager().getOnlineDataNodes(-1);
           for (TDataNodeInfo dataNodeInfo : onlineDataNodes) {
             HeartbeatHandler handler =
-                new 
HeartbeatHandler(dataNodeInfo.getLocation().getDataNodeId(), heartbeatCache);
+                new HeartbeatHandler(dataNodeInfo.getLocation(), 
heartbeatCache);
             AsyncDataNodeClientPool.getInstance()
                 .getHeartBeat(
                     dataNodeInfo.getLocation().getInternalEndPoint(), 
genHeartbeatReq(), handler);
@@ -195,7 +214,8 @@ public class LoadManager implements Runnable {
           balanceCount += 1;
           // TODO: Adjust load balancing period
           if (balanceCount == 10) {
-            doLoadBalancing();
+            // Pause load balancing temporary
+            // doLoadBalancing();
             balanceCount = 0;
           }
         } else {
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
index 0cd49efaea..69dfc67233 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
@@ -151,11 +151,6 @@ public class NodeInfo implements SnapshotProcessor {
       } else if (nextDataNodeId.get() == minimumDataNode) {
         result.setMessage("IoTDB-Cluster could provide data service, now enjoy 
yourself!");
       }
-
-      LOGGER.info(
-          "Successfully register DataNode: {}. Current online DataNodes: {}",
-          info.getLocation(),
-          onlineDataNodes);
     } finally {
       dataNodeInfoReadWriteLock.writeLock().unlock();
     }
@@ -189,6 +184,7 @@ public class NodeInfo implements SnapshotProcessor {
     return result;
   }
 
+  /** Return the number of online DataNodes */
   public int getOnlineDataNodeCount() {
     int result;
     dataNodeInfoReadWriteLock.readLock().lock();
@@ -200,6 +196,20 @@ public class NodeInfo implements SnapshotProcessor {
     return result;
   }
 
+  /** Return the number of total cpu cores in online DataNodes */
+  public int getTotalCpuCoreCount() {
+    int result = 0;
+    dataNodeInfoReadWriteLock.readLock().lock();
+    try {
+      for (TDataNodeInfo info : onlineDataNodes.values()) {
+        result += info.getCpuCoreNum();
+      }
+    } finally {
+      dataNodeInfoReadWriteLock.readLock().unlock();
+    }
+    return result;
+  }
+
   /**
    * Return the specific online DataNode
    *
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 4a6a8cb6b2..5a457a3da9 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -161,7 +161,7 @@ public class ConfigNodeRPCServiceProcessor implements 
ConfigIService.Iface {
   public TSStatus setStorageGroup(TSetStorageGroupReq req) throws TException {
     TStorageGroupSchema storageGroupSchema = req.getStorageGroup();
 
-    // Set default configurations
+    // Set default configurations if necessary
     if (!storageGroupSchema.isSetTTL()) {
       
storageGroupSchema.setTTL(CommonDescriptor.getInstance().getConfig().getDefaultTTL());
     }
@@ -177,14 +177,10 @@ public class ConfigNodeRPCServiceProcessor implements 
ConfigIService.Iface {
       storageGroupSchema.setTimePartitionInterval(
           
ConfigNodeDescriptor.getInstance().getConf().getTimePartitionInterval());
     }
-    if (!storageGroupSchema.isSetMaximumSchemaRegionCount()) {
-      storageGroupSchema.setMaximumSchemaRegionCount(
-          
ConfigNodeDescriptor.getInstance().getConf().getMaximumSchemaRegionCount());
-    }
-    if (!storageGroupSchema.isSetMaximumDataRegionCount()) {
-      storageGroupSchema.setMaximumDataRegionCount(
-          
ConfigNodeDescriptor.getInstance().getConf().getMaximumDataRegionCount());
-    }
+
+    // Mark the StorageGroup as SchemaRegions and DataRegions not yet created
+    storageGroupSchema.setMaximumSchemaRegionCount(-1);
+    storageGroupSchema.setMaximumDataRegionCount(-1);
 
     // Initialize RegionGroupId List
     storageGroupSchema.setSchemaRegionGroupIds(new ArrayList<>());

Reply via email to