This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6f0b2f6da8 [IoTDB-3053] Copyset algorithm when create timeseries
(#5862)
6f0b2f6da8 is described below
commit 6f0b2f6da8042aaefa2bd420cc45a9d43d41ade5
Author: YongzaoDan <[email protected]>
AuthorDate: Wed May 11 23:31:11 2022 +0800
[IoTDB-3053] Copyset algorithm when create timeseries (#5862)
---
.../resources/conf/iotdb-confignode.properties | 23 +--
.../confignode/client/AsyncDataNodeClientPool.java | 14 +-
...RegionHandler.java => CreateRegionHandler.java} | 41 +++-
.../iotdb/confignode/conf/ConfigNodeConf.java | 40 ++--
.../confignode/conf/ConfigNodeDescriptor.java | 12 +-
.../confignode/conf/ConfigNodeStartupCheck.java | 96 ++++-----
.../consensus/request/write/CreateRegionsReq.java | 38 ++--
.../NotEnoughDataNodeException.java} | 18 +-
.../confignode/manager/ClusterSchemaManager.java | 199 +++----------------
.../iotdb/confignode/manager/ConfigManager.java | 12 +-
.../iotdb/confignode/manager/LoadManager.java | 216 ++++++++++++++++++++-
.../apache/iotdb/confignode/manager/Manager.java | 9 +-
.../iotdb/confignode/manager/NodeManager.java | 17 +-
.../iotdb/confignode/manager/PartitionManager.java | 97 +++++++--
.../manager/allocator/CopySetRegionAllocator.java | 155 +++++++++++++++
.../manager/allocator/IRegionAllocator.java | 44 +++++
.../RegionBalancer.java} | 17 +-
.../SeriesPartitionSlotBalancer.java} | 17 +-
.../confignode/persistence/ClusterSchemaInfo.java | 68 ++++++-
.../iotdb/confignode/persistence/NodeInfo.java | 4 +
.../confignode/persistence/PartitionInfo.java | 22 ++-
.../executor/ConfigRequestExecutor.java | 5 +
.../thrift/ConfigNodeRPCServiceProcessor.java | 9 +
.../consensus/request/ConfigRequestSerDeTest.java | 4 +-
.../confignode/persistence/PartitionInfoTest.java | 2 +-
.../thrift/ConfigNodeRPCServiceProcessorTest.java | 7 +-
.../iotdb/commons/consensus/ConsensusGroupId.java | 20 ++
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 3 +-
.../src/main/thrift/confignode.thrift | 6 +-
29 files changed, 834 insertions(+), 381 deletions(-)
diff --git a/confignode/src/assembly/resources/conf/iotdb-confignode.properties
b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
index 05d4d9efaf..9b05d05c0f 100644
--- a/confignode/src/assembly/resources/conf/iotdb-confignode.properties
+++ b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
@@ -117,14 +117,14 @@ target_confignode=0.0.0.0:22277
# data_replication_factor=3
-# The initial number of SchemaRegions of each StorageGroup
+# The maximum number of SchemaRegions of each StorageGroup
# Datatype: int
-# initial_schema_region_count=1
+# maximum_schema_region_count=4
-# The initial number of DataRegions of each StorageGroup
+# The maximum number of DataRegions of each StorageGroup
# Datatype: int
-# initial_data_region_count=1
+# maximum_data_region_count=20
####################
@@ -171,8 +171,6 @@ target_confignode=0.0.0.0:22277
### Directory configuration
####################
-# All parameters in Directory configuration is unmodifiable after ConfigNode
starts for the first time.
-
# system dir
# If this property is unset, system will save the data in the default relative
path directory under the confignode folder(i.e., %CONFIGNODE_HOME%/data/system).
@@ -186,19 +184,6 @@ target_confignode=0.0.0.0:22277
# system_dir=data/system
-# data dirs
-# If this property is unset, system will save the data in the default relative
path directory under the confignode folder(i.e., %CONFIGNODE_HOME%/data/data).
-# If it is absolute, system will save the data in exact location it points to.
-# If it is relative, system will save the data in the relative path directory
it indicates under the confignode folder.
-# Note: If data_dir is assigned an empty string(i.e.,zero-size), it will be
handled as a relative path.
-# For windows platform
-# If its prefix is a drive specifier followed by "\\", or if its prefix is
"\\\\", then the path is absolute. Otherwise, it is relative.
-# data_dirs=data\\data
-# For Linux platform
-# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
-# data_dirs=data/data
-
-
# consensus dir
# If this property is unset, system will save the data in the default relative
path directory under the confignode folder(i.e.,
%CONFIGNODE_HOME%/data/consensus).
# If it is absolute, system will save the data in exact location it points to.
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
b/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
index f294db6377..b4333651c1 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.confignode.client;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
import
org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
-import org.apache.iotdb.confignode.client.handlers.InitRegionHandler;
+import org.apache.iotdb.confignode.client.handlers.CreateRegionHandler;
import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
@@ -46,12 +46,12 @@ public class AsyncDataNodeClientPool {
}
/**
- * Only use this interface when initialize SchemaRegion to set StorageGroup
+ * Create a SchemaRegion on specific DataNode
*
* @param endPoint The specific DataNode
*/
- public void initSchemaRegion(
- TEndPoint endPoint, TCreateSchemaRegionReq req, InitRegionHandler
handler) {
+ public void createSchemaRegion(
+ TEndPoint endPoint, TCreateSchemaRegionReq req, CreateRegionHandler
handler) {
AsyncDataNodeInternalServiceClient client;
try {
client = clientManager.borrowClient(endPoint);
@@ -64,12 +64,12 @@ public class AsyncDataNodeClientPool {
}
/**
- * Only use this interface when initialize SchemaRegion to set StorageGroup
+ * Create a DataRegion on specific DataNode
*
* @param endPoint The specific DataNode
*/
- public void initDataRegion(
- TEndPoint endPoint, TCreateDataRegionReq req, InitRegionHandler handler)
{
+ public void createDataRegion(
+ TEndPoint endPoint, TCreateDataRegionReq req, CreateRegionHandler
handler) {
AsyncDataNodeInternalServiceClient client;
try {
client = clientManager.borrowClient(endPoint);
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/InitRegionHandler.java
b/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/CreateRegionHandler.java
similarity index 51%
rename from
confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/InitRegionHandler.java
rename to
confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/CreateRegionHandler.java
index 4be95bb347..fbcca3e121 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/InitRegionHandler.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/CreateRegionHandler.java
@@ -18,7 +18,10 @@
*/
package org.apache.iotdb.confignode.client.handlers;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.async.AsyncMethodCallback;
@@ -28,19 +31,33 @@ import org.slf4j.LoggerFactory;
import java.util.BitSet;
import java.util.concurrent.CountDownLatch;
-/** Only use this handler when initialize Region to set StorageGroup */
-public class InitRegionHandler implements AsyncMethodCallback<TSStatus> {
+/** Only use CreateRegionHandler when the LoadManager wants to create Regions
*/
+public class CreateRegionHandler implements AsyncMethodCallback<TSStatus> {
- private static final Logger LOGGER =
LoggerFactory.getLogger(InitRegionHandler.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CreateRegionHandler.class);
+ // Mark BitSet when successfully create
private final int index;
private final BitSet bitSet;
+
+ // Used to protect asynchronous creation
private final CountDownLatch latch;
- public InitRegionHandler(int index, BitSet bitSet, CountDownLatch latch) {
+ // Used for Logger
+ private final TConsensusGroupId consensusGroupId;
+ private final TDataNodeLocation dataNodeLocation;
+
+ public CreateRegionHandler(
+ int index,
+ BitSet bitSet,
+ CountDownLatch latch,
+ TConsensusGroupId consensusGroupId,
+ TDataNodeLocation dataNodeLocation) {
this.index = index;
this.bitSet = bitSet;
this.latch = latch;
+ this.consensusGroupId = consensusGroupId;
+ this.dataNodeLocation = dataNodeLocation;
}
@Override
@@ -49,15 +66,27 @@ public class InitRegionHandler implements
AsyncMethodCallback<TSStatus> {
synchronized (bitSet) {
bitSet.set(index);
}
+ LOGGER.info(
+ String.format(
+ "Successfully create %s on DataNode: %s",
+ ConsensusGroupId.formatTConsensusGroupId(consensusGroupId),
dataNodeLocation));
} else {
- LOGGER.error(tsStatus.toString());
+ LOGGER.error(
+ String.format(
+ "Create %s on DataNode: %s failed, %s",
+ ConsensusGroupId.formatTConsensusGroupId(consensusGroupId),
+ dataNodeLocation,
+ tsStatus));
}
latch.countDown();
}
@Override
public void onError(Exception e) {
- LOGGER.error(e.getMessage());
+ LOGGER.error(
+ String.format(
+ "Create %s on DataNode: %s failed, %s",
+ ConsensusGroupId.formatTConsensusGroupId(consensusGroupId),
dataNodeLocation, e));
latch.countDown();
}
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
index 845384c536..e26e9026aa 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
@@ -99,11 +99,6 @@ public class ConfigNodeConf {
private String systemDir =
ConfigNodeConstant.DATA_DIR + File.separator +
IoTDBConstant.SYSTEM_FOLDER_NAME;
- /** Data directory of data. It can be settled as dataDirs = {"data1",
"data2", "data3"}; */
- private String[] dataDirs = {
- ConfigNodeConstant.DATA_DIR + File.separator + ConfigNodeConstant.DATA_DIR
- };
-
/** Consensus directory, storage consensus protocol logs */
private String consensusDir =
ConfigNodeConstant.DATA_DIR + File.separator +
ConfigNodeConstant.CONSENSUS_FOLDER;
@@ -117,11 +112,11 @@ public class ConfigNodeConf {
/** Default number of DataRegion replicas */
private int dataReplicationFactor = 3;
- /** The initial number of SchemaRegions of each StorageGroup */
- private int initialSchemaRegionCount = 1;
+ /** The maximum number of SchemaRegions of each StorageGroup */
+ private int maximumSchemaRegionCount = 4;
- /** The initial number of DataRegions of each StorageGroup */
- private int initialDataRegionCount = 1;
+ /** The maximum number of DataRegions of each StorageGroup */
+ private int maximumDataRegionCount = 20;
ConfigNodeConf() {
// empty constructor
@@ -133,9 +128,6 @@ public class ConfigNodeConf {
private void formulateFolders() {
systemDir = addHomeDir(systemDir);
- for (int i = 0; i < dataDirs.length; i++) {
- dataDirs[i] = addHomeDir(dataDirs[i]);
- }
consensusDir = addHomeDir(consensusDir);
}
@@ -324,14 +316,6 @@ public class ConfigNodeConf {
this.systemDir = systemDir;
}
- public String[] getDataDirs() {
- return dataDirs;
- }
-
- public void setDataDirs(String[] dataDirs) {
- this.dataDirs = dataDirs;
- }
-
public int getSchemaReplicationFactor() {
return schemaReplicationFactor;
}
@@ -348,19 +332,19 @@ public class ConfigNodeConf {
this.dataReplicationFactor = dataReplicationFactor;
}
- public int getInitialSchemaRegionCount() {
- return initialSchemaRegionCount;
+ public int getMaximumSchemaRegionCount() {
+ return maximumSchemaRegionCount;
}
- public void setInitialSchemaRegionCount(int initialSchemaRegionCount) {
- this.initialSchemaRegionCount = initialSchemaRegionCount;
+ public void setMaximumSchemaRegionCount(int maximumSchemaRegionCount) {
+ this.maximumSchemaRegionCount = maximumSchemaRegionCount;
}
- public int getInitialDataRegionCount() {
- return initialDataRegionCount;
+ public int getMaximumDataRegionCount() {
+ return maximumDataRegionCount;
}
- public void setInitialDataRegionCount(int initialDataRegionCount) {
- this.initialDataRegionCount = initialDataRegionCount;
+ public void setMaximumDataRegionCount(int maximumDataRegionCount) {
+ this.maximumDataRegionCount = maximumDataRegionCount;
}
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 783bae94a2..d262637d59 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -174,8 +174,6 @@ public class ConfigNodeDescriptor {
conf.setSystemDir(properties.getProperty("system_dir",
conf.getSystemDir()));
- conf.setDataDirs(properties.getProperty("data_dirs",
conf.getDataDirs()[0]).split(","));
-
conf.setConsensusDir(properties.getProperty("consensus_dir",
conf.getConsensusDir()));
conf.setTimePartitionInterval(
@@ -193,16 +191,16 @@ public class ConfigNodeDescriptor {
properties.getProperty(
"data_replication_factor",
String.valueOf(conf.getDataReplicationFactor()))));
- conf.setInitialSchemaRegionCount(
+ conf.setMaximumSchemaRegionCount(
Integer.parseInt(
properties.getProperty(
- "initial_schema_region_count",
- String.valueOf(conf.getInitialSchemaRegionCount()))));
+ "maximum_schema_region_count",
+ String.valueOf(conf.getMaximumSchemaRegionCount()))));
- conf.setInitialDataRegionCount(
+ conf.setMaximumDataRegionCount(
Integer.parseInt(
properties.getProperty(
- "initial_data_region_count",
String.valueOf(conf.getInitialDataRegionCount()))));
+ "maximum_data_region_count",
String.valueOf(conf.getMaximumDataRegionCount()))));
// commons
commonDescriptor.loadCommonProps(properties);
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
index 8acdb93bc5..ca4713adf3 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
@@ -37,7 +37,6 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
-import java.util.Arrays;
import java.util.Collections;
import java.util.Objects;
import java.util.Properties;
@@ -224,11 +223,6 @@ public class ConfigNodeStartupCheck {
systemProperties.setProperty(
"series_partition_executor_class",
conf.getSeriesPartitionExecutorClass());
- // Directory configuration
- systemProperties.setProperty("system_dir", conf.getSystemDir());
- systemProperties.setProperty("data_dirs", String.join(",",
conf.getDataDirs()));
- systemProperties.setProperty("consensus_dir", conf.getConsensusDir());
-
// ConfigNodeList
systemProperties.setProperty(
"confignode_list",
NodeUrlUtils.convertTConfigNodeUrls(conf.getConfigNodeList()));
@@ -243,28 +237,45 @@ public class ConfigNodeStartupCheck {
/** Ensure that special parameters are consistent with each startup except
the first one */
private void checkSystemProperties() throws ConfigurationException {
+ boolean needReWrite = false;
+
// Startup configuration
- String rpcAddress = systemProperties.getProperty("rpc_address");
- if (!rpcAddress.equals(conf.getRpcAddress())) {
+ String rpcAddress = systemProperties.getProperty("rpc_address", null);
+ if (rpcAddress == null) {
+ needReWrite = true;
+ } else if (!rpcAddress.equals(conf.getRpcAddress())) {
throw new ConfigurationException("rpc_address", conf.getRpcAddress(),
rpcAddress);
}
- int rpcPort = Integer.parseInt(systemProperties.getProperty("rpc_port"));
- if (rpcPort != conf.getRpcPort()) {
- throw new ConfigurationException(
- "rpc_port", String.valueOf(conf.getRpcPort()),
String.valueOf(rpcPort));
+ if (systemProperties.getProperty("rpc_port", null) == null) {
+ needReWrite = true;
+ } else {
+ int rpcPort = Integer.parseInt(systemProperties.getProperty("rpc_port"));
+ if (rpcPort != conf.getRpcPort()) {
+ throw new ConfigurationException(
+ "rpc_port", String.valueOf(conf.getRpcPort()),
String.valueOf(rpcPort));
+ }
}
- int consensusPort =
Integer.parseInt(systemProperties.getProperty("consensus_port"));
- if (consensusPort != conf.getConsensusPort()) {
- throw new ConfigurationException(
- "consensus_port", String.valueOf(conf.getConsensusPort()),
String.valueOf(consensusPort));
+ if (systemProperties.getProperty("consensus_port", null) == null) {
+ needReWrite = true;
+ } else {
+ int consensusPort =
Integer.parseInt(systemProperties.getProperty("consensus_port"));
+ if (consensusPort != conf.getConsensusPort()) {
+ throw new ConfigurationException(
+ "consensus_port",
+ String.valueOf(conf.getConsensusPort()),
+ String.valueOf(consensusPort));
+ }
}
// Consensus protocol configuration
String configNodeConsensusProtocolClass =
- systemProperties.getProperty("config_node_consensus_protocol_class");
- if
(!configNodeConsensusProtocolClass.equals(conf.getConfigNodeConsensusProtocolClass()))
{
+ systemProperties.getProperty("config_node_consensus_protocol_class",
null);
+ if (configNodeConsensusProtocolClass == null) {
+ needReWrite = true;
+ } else if (!configNodeConsensusProtocolClass.equals(
+ conf.getConfigNodeConsensusProtocolClass())) {
throw new ConfigurationException(
"config_node_consensus_protocol_class",
conf.getConfigNodeConsensusProtocolClass(),
@@ -272,8 +283,10 @@ public class ConfigNodeStartupCheck {
}
String dataNodeConsensusProtocolClass =
- systemProperties.getProperty("data_node_consensus_protocol_class");
- if
(!dataNodeConsensusProtocolClass.equals(conf.getDataNodeConsensusProtocolClass()))
{
+ systemProperties.getProperty("data_node_consensus_protocol_class",
null);
+ if (dataNodeConsensusProtocolClass == null) {
+ needReWrite = true;
+ } else if
(!dataNodeConsensusProtocolClass.equals(conf.getDataNodeConsensusProtocolClass()))
{
throw new ConfigurationException(
"data_node_consensus_protocol_class",
conf.getDataNodeConsensusProtocolClass(),
@@ -281,39 +294,34 @@ public class ConfigNodeStartupCheck {
}
// PartitionSlot configuration
- int seriesPartitionSlotNum =
-
Integer.parseInt(systemProperties.getProperty("series_partition_slot_num"));
- if (seriesPartitionSlotNum != conf.getSeriesPartitionSlotNum()) {
- throw new ConfigurationException(
- "series_partition_slot_num",
- String.valueOf(conf.getSeriesPartitionSlotNum()),
- String.valueOf(seriesPartitionSlotNum));
+ if (systemProperties.getProperty("series_partition_slot_num", null) ==
null) {
+ needReWrite = true;
+ } else {
+ int seriesPartitionSlotNum =
+
Integer.parseInt(systemProperties.getProperty("series_partition_slot_num"));
+ if (seriesPartitionSlotNum != conf.getSeriesPartitionSlotNum()) {
+ throw new ConfigurationException(
+ "series_partition_slot_num",
+ String.valueOf(conf.getSeriesPartitionSlotNum()),
+ String.valueOf(seriesPartitionSlotNum));
+ }
}
String seriesPartitionSlotExecutorClass =
- systemProperties.getProperty("series_partition_executor_class");
- if (!Objects.equals(seriesPartitionSlotExecutorClass,
conf.getSeriesPartitionExecutorClass())) {
+ systemProperties.getProperty("series_partition_executor_class", null);
+ if (seriesPartitionSlotExecutorClass == null) {
+ needReWrite = true;
+ } else if (!Objects.equals(
+ seriesPartitionSlotExecutorClass,
conf.getSeriesPartitionExecutorClass())) {
throw new ConfigurationException(
"series_partition_executor_class",
conf.getSeriesPartitionExecutorClass(),
seriesPartitionSlotExecutorClass);
}
- // Directory configuration
- String systemDir = systemProperties.getProperty("system_dir");
- if (!systemDir.equals(conf.getSystemDir())) {
- throw new ConfigurationException("system_dir", conf.getSystemDir(),
systemDir);
- }
-
- String[] dataDirs = systemProperties.getProperty("data_dirs").split(",");
- if (!Arrays.equals(dataDirs, conf.getDataDirs())) {
- throw new ConfigurationException(
- "data_dirs", String.join(",", conf.getDataDirs()), String.join(",",
dataDirs));
- }
-
- String consensusDir = systemProperties.getProperty("consensus_dir");
- if (!consensusDir.equals(conf.getConsensusDir())) {
- throw new ConfigurationException("consensus_dir",
conf.getConsensusDir(), consensusDir);
+ if (needReWrite) {
+ // Re-write special parameters if necessary
+ writeSystemProperties();
}
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateRegionsReq.java
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateRegionsReq.java
index b622d5d6d7..cc8b142721 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateRegionsReq.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateRegionsReq.java
@@ -19,49 +19,55 @@
package org.apache.iotdb.confignode.consensus.request.write;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.utils.BasicStructureSerDeUtil;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
import org.apache.iotdb.confignode.consensus.request.ConfigRequest;
import org.apache.iotdb.confignode.consensus.request.ConfigRequestType;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Map;
import java.util.Objects;
+import java.util.TreeMap;
-/** Create regions for specific StorageGroup */
+/** Create regions for specific StorageGroups */
public class CreateRegionsReq extends ConfigRequest {
- private final List<TRegionReplicaSet> regionReplicaSets;
+ private final Map<String, TRegionReplicaSet> regionMap;
public CreateRegionsReq() {
super(ConfigRequestType.CreateRegions);
- this.regionReplicaSets = new ArrayList<>();
+ this.regionMap = new TreeMap<>();
}
- public void addRegion(TRegionReplicaSet regionReplicaSet) {
- this.regionReplicaSets.add(regionReplicaSet);
+ public Map<String, TRegionReplicaSet> getRegionMap() {
+ return regionMap;
}
- public List<TRegionReplicaSet> getRegionReplicaSets() {
- return regionReplicaSets;
+ public void addRegion(String storageGroup, TRegionReplicaSet
regionReplicaSet) {
+ regionMap.put(storageGroup, regionReplicaSet);
}
@Override
protected void serializeImpl(ByteBuffer buffer) {
buffer.putInt(ConfigRequestType.CreateRegions.ordinal());
- buffer.putInt(regionReplicaSets.size());
- for (TRegionReplicaSet regionReplicaSet : regionReplicaSets) {
- ThriftCommonsSerDeUtils.serializeTRegionReplicaSet(regionReplicaSet,
buffer);
- }
+ buffer.putInt(regionMap.size());
+ regionMap.forEach(
+ (storageGroup, regionReplicaSet) -> {
+ BasicStructureSerDeUtil.write(storageGroup, buffer);
+ ThriftCommonsSerDeUtils.serializeTRegionReplicaSet(regionReplicaSet,
buffer);
+ });
}
@Override
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
int length = buffer.getInt();
for (int i = 0; i < length; i++) {
-
regionReplicaSets.add(ThriftCommonsSerDeUtils.deserializeTRegionReplicaSet(buffer));
+ String storageGroup = BasicStructureSerDeUtil.readString(buffer);
+ TRegionReplicaSet regionReplicaSet =
+ ThriftCommonsSerDeUtils.deserializeTRegionReplicaSet(buffer);
+ regionMap.put(storageGroup, regionReplicaSet);
}
}
@@ -70,11 +76,11 @@ public class CreateRegionsReq extends ConfigRequest {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CreateRegionsReq that = (CreateRegionsReq) o;
- return regionReplicaSets.equals(that.regionReplicaSets);
+ return regionMap.equals(that.regionMap);
}
@Override
public int hashCode() {
- return Objects.hash(regionReplicaSets);
+ return Objects.hash(regionMap);
}
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/LoadManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/exception/NotEnoughDataNodeException.java
similarity index 68%
copy from
confignode/src/main/java/org/apache/iotdb/confignode/manager/LoadManager.java
copy to
confignode/src/main/java/org/apache/iotdb/confignode/exception/NotEnoughDataNodeException.java
index 39bf58b28d..e5d1639b84 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/LoadManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/exception/NotEnoughDataNodeException.java
@@ -16,19 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager;
+package org.apache.iotdb.confignode.exception;
-/**
- * The LoadBalancer at ConfigNodeGroup-Leader is active for cluster dynamic
load balancing
- * scheduling
- */
-public class LoadManager {
-
- private void metadataLoadBalance() {}
-
- private void dataLoadBalance() {}
-
- private void cleanDataSlice() {}
+public class NotEnoughDataNodeException extends ConfigNodeException {
- // TODO: Interfaces for active, interrupt and reset LoadBalancer
+ public NotEnoughDataNodeException() {
+ super("DataNode is not enough, please register more.");
+ }
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
index 696fddcfc4..d81330669b 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
@@ -20,17 +20,10 @@ package org.apache.iotdb.confignode.manager;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.client.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.handlers.InitRegionHandler;
-import org.apache.iotdb.confignode.conf.ConfigNodeConf;
-import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.confignode.consensus.request.read.CountStorageGroupReq;
import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupReq;
-import org.apache.iotdb.confignode.consensus.request.write.CreateRegionsReq;
import
org.apache.iotdb.confignode.consensus.request.write.SetDataReplicationFactorReq;
import
org.apache.iotdb.confignode.consensus.request.write.SetSchemaReplicationFactorReq;
import org.apache.iotdb.confignode.consensus.request.write.SetStorageGroupReq;
@@ -39,34 +32,21 @@ import
org.apache.iotdb.confignode.consensus.request.write.SetTimePartitionInter
import org.apache.iotdb.confignode.consensus.response.CountStorageGroupResp;
import org.apache.iotdb.confignode.consensus.response.StorageGroupSchemaResp;
import org.apache.iotdb.confignode.persistence.ClusterSchemaInfo;
-import org.apache.iotdb.confignode.persistence.NodeInfo;
-import org.apache.iotdb.confignode.persistence.PartitionInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
-import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
-import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.Collections;
import java.util.List;
-import java.util.concurrent.CountDownLatch;
+/** The ClusterSchemaManager Manages cluster schema read and write requests. */
public class ClusterSchemaManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(ClusterSchemaManager.class);
- private static final ConfigNodeConf conf =
ConfigNodeDescriptor.getInstance().getConf();
- private static final int schemaReplicationFactor =
conf.getSchemaReplicationFactor();
- private static final int dataReplicationFactor =
conf.getDataReplicationFactor();
- private static final int initialSchemaRegionCount =
conf.getInitialSchemaRegionCount();
- private static final int initialDataRegionCount =
conf.getInitialDataRegionCount();
-
private static final ClusterSchemaInfo clusterSchemaInfo =
ClusterSchemaInfo.getInstance();
- private static final PartitionInfo partitionInfo =
PartitionInfo.getInstance();
private final Manager configManager;
@@ -75,142 +55,30 @@ public class ClusterSchemaManager {
}
/**
- * Set StorageGroup and allocate the default amount Regions
+ * Set StorageGroup
*
- * @return SUCCESS_STATUS if the StorageGroup is set and region allocation
successful.
- * NOT_ENOUGH_DATA_NODE if there are not enough DataNode for Region
allocation.
- * STORAGE_GROUP_ALREADY_EXISTS if the StorageGroup is already set.
+ * @return SUCCESS_STATUS if the StorageGroup is set successfully.
STORAGE_GROUP_ALREADY_EXISTS if
+ * the StorageGroup is already set. PERSISTENCE_FAILURE if fail to set
StorageGroup in
+ * MTreeAboveSG.
*/
public TSStatus setStorageGroup(SetStorageGroupReq setStorageGroupReq) {
TSStatus result;
- if (configManager.getDataNodeManager().getOnlineDataNodeCount()
- < Math.max(initialSchemaRegionCount, initialDataRegionCount)) {
- result = new TSStatus(TSStatusCode.NOT_ENOUGH_DATA_NODE.getStatusCode());
- result.setMessage("DataNode is not enough, please register more.");
+ if
(clusterSchemaInfo.containsStorageGroup(setStorageGroupReq.getSchema().getName()))
{
+ // Reject if StorageGroup already set
+ result = new
TSStatus(TSStatusCode.STORAGE_GROUP_ALREADY_EXISTS.getStatusCode());
+ result.setMessage(
+ String.format(
+ "StorageGroup %s is already set.",
setStorageGroupReq.getSchema().getName()));
} else {
- if
(clusterSchemaInfo.containsStorageGroup(setStorageGroupReq.getSchema().getName()))
{
- result = new
TSStatus(TSStatusCode.STORAGE_GROUP_ALREADY_EXISTS.getStatusCode());
- result.setMessage(
- String.format(
- "StorageGroup %s is already set.",
setStorageGroupReq.getSchema().getName()));
- } else {
- CreateRegionsReq createRegionsReq = new CreateRegionsReq();
-
- // Allocate default Regions
- allocateRegions(TConsensusGroupType.SchemaRegion, createRegionsReq,
setStorageGroupReq);
- allocateRegions(TConsensusGroupType.DataRegion, createRegionsReq,
setStorageGroupReq);
-
- // Create Regions in DataNode
- createRegions(
- setStorageGroupReq.getSchema().getName(),
- createRegionsReq,
- setStorageGroupReq.getSchema().getTTL());
-
- // Persist StorageGroup and Regions
- getConsensusManager().write(setStorageGroupReq);
- result = getConsensusManager().write(createRegionsReq).getStatus();
- }
+ // Persist StorageGroupSchema
+ result = getConsensusManager().write(setStorageGroupReq).getStatus();
}
return result;
}
- /** TODO: Allocate by LoadManager */
- private void allocateRegions(
- TConsensusGroupType type, CreateRegionsReq createRegionsReq,
SetStorageGroupReq setSGReq) {
-
- // TODO: Use CopySet algorithm to optimize region allocation policy
-
- int replicaCount =
- type.equals(TConsensusGroupType.SchemaRegion)
- ? schemaReplicationFactor
- : dataReplicationFactor;
- int regionCount =
- type.equals(TConsensusGroupType.SchemaRegion)
- ? initialSchemaRegionCount
- : initialDataRegionCount;
- List<TDataNodeLocation> onlineDataNodes =
getDataNodeInfoManager().getOnlineDataNodes();
- for (int i = 0; i < regionCount; i++) {
- Collections.shuffle(onlineDataNodes);
-
- TRegionReplicaSet regionReplicaSet = new TRegionReplicaSet();
- TConsensusGroupId consensusGroupId =
- new TConsensusGroupId(type,
partitionInfo.generateNextRegionGroupId());
- regionReplicaSet.setRegionId(consensusGroupId);
- regionReplicaSet.setDataNodeLocations(
- new ArrayList<>(onlineDataNodes.subList(0, replicaCount)));
- createRegionsReq.addRegion(regionReplicaSet);
-
- switch (type) {
- case SchemaRegion:
- setSGReq.getSchema().addToSchemaRegionGroupIds(consensusGroupId);
- break;
- case DataRegion:
- setSGReq.getSchema().addToDataRegionGroupIds(consensusGroupId);
- }
- }
- }
-
- /** Create Regions on DataNode TODO: Async create Regions by LoadManager */
- private void createRegions(String storageGroup, CreateRegionsReq
createRegionsReq, long TTL) {
- int regionNum =
- initialSchemaRegionCount * schemaReplicationFactor
- + initialDataRegionCount * dataReplicationFactor;
- BitSet bitSet = new BitSet(regionNum);
- List<TEndPoint> schemaRegionEndPoints = new ArrayList<>();
- List<TEndPoint> dataRegionEndPoints = new ArrayList<>();
-
- for (int retry = 0; retry < 3; retry++) {
- int index = 0;
- CountDownLatch latch = new CountDownLatch(regionNum -
bitSet.cardinality());
- for (TRegionReplicaSet regionReplicaSet :
createRegionsReq.getRegionReplicaSets()) {
- for (TDataNodeLocation dataNodeLocation :
regionReplicaSet.getDataNodeLocations()) {
- TEndPoint endPoint =
- NodeInfo.getInstance()
- .getOnlineDataNode(dataNodeLocation.getDataNodeId())
- .getInternalEndPoint();
- InitRegionHandler handler = new InitRegionHandler(index, bitSet,
latch);
- switch (regionReplicaSet.getRegionId().getType()) {
- case SchemaRegion:
- if (retry == 0) {
- schemaRegionEndPoints.add(endPoint);
- }
- AsyncDataNodeClientPool.getInstance()
- .initSchemaRegion(
- endPoint, genCreateSchemaRegionReq(storageGroup,
regionReplicaSet), handler);
- break;
- case DataRegion:
- if (retry == 0) {
- dataRegionEndPoints.add(endPoint);
- }
- AsyncDataNodeClientPool.getInstance()
- .initDataRegion(
- endPoint,
- genCreateDataRegionReq(storageGroup, regionReplicaSet,
TTL),
- handler);
- }
- index += 1;
- }
- }
- try {
- latch.await();
- } catch (InterruptedException e) {
- LOGGER.error("ClusterSchemaManager was interrupted during create
Regions on DataNodes", e);
- }
- if (bitSet.cardinality() == regionNum) {
- break;
- }
- }
-
- if (bitSet.cardinality() < regionNum) {
- LOGGER.error("Can't create SchemaRegions and DataRegions on DataNodes.");
- } else {
- LOGGER.info("Successfully create SchemaRegions on DataNodes: {}",
schemaRegionEndPoints);
- LOGGER.info("Successfully create DataRegions on DataNodes: {}",
dataRegionEndPoints);
- }
- }
-
/**
- * Get the SchemaRegionGroupIds or DataRegionGroupIds from the specific
StorageGroup
+ * Only leader use this interface. Get the SchemaRegionGroupIds or
DataRegionGroupIds from the
+ * specific StorageGroup.
*
* @param storageGroup StorageGroupName
* @param type SchemaRegion or DataRegion
@@ -221,6 +89,18 @@ public class ClusterSchemaManager {
return clusterSchemaInfo.getRegionGroupIds(storageGroup, type);
}
+ /**
+ * Only leader use this interface.
+ *
+ * @param storageGroup StorageGroupName
+ * @return the matched StorageGroupSchema
+ * @throws MetadataException when the specific StorageGroup doesn't exist
+ */
+ public TStorageGroupSchema getStorageGroupSchemaByName(String storageGroup)
+ throws MetadataException {
+ return clusterSchemaInfo.getMatchedStorageGroupSchemaByName(storageGroup);
+ }
+
public TSStatus setTTL(SetTTLReq setTTLReq) {
// TODO: Inform DataNodes
return getConsensusManager().write(setTTLReq).getStatus();
@@ -266,31 +146,10 @@ public class ClusterSchemaManager {
return (StorageGroupSchemaResp) readResponse.getDataset();
}
- private TCreateSchemaRegionReq genCreateSchemaRegionReq(
- String storageGroup, TRegionReplicaSet regionReplicaSet) {
- TCreateSchemaRegionReq req = new TCreateSchemaRegionReq();
- req.setStorageGroup(storageGroup);
- req.setRegionReplicaSet(regionReplicaSet);
- return req;
- }
-
- private TCreateDataRegionReq genCreateDataRegionReq(
- String storageGroup, TRegionReplicaSet regionReplicaSet, long TTL) {
- TCreateDataRegionReq req = new TCreateDataRegionReq();
- req.setStorageGroup(storageGroup);
- req.setRegionReplicaSet(regionReplicaSet);
- req.setTtl(TTL);
- return req;
- }
-
public List<String> getStorageGroupNames() {
return clusterSchemaInfo.getStorageGroupNames();
}
- private NodeManager getDataNodeInfoManager() {
- return configManager.getDataNodeManager();
- }
-
private ConsensusManager getConsensusManager() {
return configManager.getConsensusManager();
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index d2ba96806a..fd5e44e379 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -80,12 +80,15 @@ public class ConfigManager implements Manager {
/** Manage cluster authorization */
private final PermissionManager permissionManager;
+ private final LoadManager loadManager;
+
public ConfigManager() throws IOException {
this.nodeManager = new NodeManager(this);
this.partitionManager = new PartitionManager(this);
this.clusterSchemaManager = new ClusterSchemaManager(this);
- this.consensusManager = new ConsensusManager();
this.permissionManager = new PermissionManager(this);
+ this.loadManager = new LoadManager(this);
+ this.consensusManager = new ConsensusManager();
}
public void close() throws IOException {
@@ -323,7 +326,7 @@ public class ConfigManager implements Manager {
}
@Override
- public NodeManager getDataNodeManager() {
+ public NodeManager getNodeManager() {
return nodeManager;
}
@@ -342,6 +345,11 @@ public class ConfigManager implements Manager {
return partitionManager;
}
+ @Override
+ public LoadManager getLoadManager() {
+ return loadManager;
+ }
+
@Override
public TSStatus operatePermission(ConfigRequest configRequest) {
TSStatus status = confirmLeader();
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/LoadManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/LoadManager.java
index 39bf58b28d..67aa1e7ffb 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/LoadManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/LoadManager.java
@@ -18,17 +18,223 @@
*/
package org.apache.iotdb.confignode.manager;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.confignode.client.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.handlers.CreateRegionHandler;
+import org.apache.iotdb.confignode.consensus.request.write.CreateRegionsReq;
+import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
+import org.apache.iotdb.confignode.manager.allocator.CopySetRegionAllocator;
+import org.apache.iotdb.confignode.manager.allocator.IRegionAllocator;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
+import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
+import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
/**
- * The LoadBalancer at ConfigNodeGroup-Leader is active for cluster dynamic
load balancing
- * scheduling
+ * The LoadManager at ConfigNodeGroup-Leader is active. It proactively
implements the cluster
+ * dynamic load balancing policy and passively accepts the PartitionTable
expansion request.
*/
public class LoadManager {
- private void metadataLoadBalance() {}
+ private static final Logger LOGGER =
LoggerFactory.getLogger(LoadManager.class);
- private void dataLoadBalance() {}
+ private final Manager configManager;
- private void cleanDataSlice() {}
+ private final IRegionAllocator regionAllocator;
// TODO: Interfaces for active, interrupt and reset LoadBalancer
+
+ public LoadManager(Manager configManager) {
+ this.configManager = configManager;
+ this.regionAllocator = new CopySetRegionAllocator();
+ }
+
+ /**
+ * Allocate and create one Region on DataNode for each StorageGroup.
+ *
+ * @param storageGroups List<StorageGroupName>
+ * @param consensusGroupType TConsensusGroupType of Region to be allocated
+ */
+ public void allocateAndCreateRegions(
+ List<String> storageGroups, TConsensusGroupType consensusGroupType)
+ throws NotEnoughDataNodeException {
+ CreateRegionsReq createRegionsReq = null;
+
+ // TODO: use procedure to protect create Regions process
+ try {
+ createRegionsReq = allocateRegions(storageGroups, consensusGroupType);
+ createRegionsOnDataNodes(createRegionsReq);
+ } catch (MetadataException e) {
+ LOGGER.error("Meet error when create Regions", e);
+ }
+
+ getConsensusManager().write(createRegionsReq);
+ }
+
+ private CreateRegionsReq allocateRegions(
+ List<String> storageGroups, TConsensusGroupType consensusGroupType)
+ throws NotEnoughDataNodeException, MetadataException {
+ CreateRegionsReq createRegionsReq = new CreateRegionsReq();
+
+ List<TDataNodeLocation> onlineDataNodes =
getNodeManager().getOnlineDataNodes();
+ List<TRegionReplicaSet> allocatedRegions =
getPartitionManager().getAllocatedRegions();
+
+ for (String storageGroup : storageGroups) {
+ TStorageGroupSchema storageGroupSchema =
+ getClusterSchemaManager().getStorageGroupSchemaByName(storageGroup);
+ int replicationFactor =
+ consensusGroupType == TConsensusGroupType.SchemaRegion
+ ? storageGroupSchema.getSchemaReplicationFactor()
+ : storageGroupSchema.getDataReplicationFactor();
+
+ if (onlineDataNodes.size() < replicationFactor) {
+ throw new NotEnoughDataNodeException();
+ }
+
+ TRegionReplicaSet newRegion =
+ regionAllocator.allocateRegion(
+ onlineDataNodes,
+ allocatedRegions,
+ replicationFactor,
+ new TConsensusGroupId(
+ consensusGroupType,
getPartitionManager().generateNextRegionGroupId()));
+ createRegionsReq.addRegion(storageGroup, newRegion);
+ }
+
+ return createRegionsReq;
+ }
+
+ private void createRegionsOnDataNodes(CreateRegionsReq createRegionsReq)
+ throws MetadataException {
+ // Index of each Region
+ int index = 0;
+ // Number of regions to be created
+ int regionNum = 0;
+ Map<String, Map<Integer, Integer>> indexMap = new HashMap<>();
+ Map<String, Long> ttlMap = new HashMap<>();
+ for (Map.Entry<String, TRegionReplicaSet> entry :
createRegionsReq.getRegionMap().entrySet()) {
+ regionNum += entry.getValue().getDataNodeLocationsSize();
+ ttlMap.put(
+ entry.getKey(),
+
getClusterSchemaManager().getStorageGroupSchemaByName(entry.getKey()).getTTL());
+ for (TDataNodeLocation dataNodeLocation :
entry.getValue().getDataNodeLocations()) {
+ indexMap
+ .computeIfAbsent(entry.getKey(), sg -> new HashMap<>())
+ .put(dataNodeLocation.getDataNodeId(), index);
+ index += 1;
+ }
+ }
+
+ BitSet bitSet = new BitSet(regionNum);
+
+ for (int retry = 0; retry < 3; retry++) {
+ CountDownLatch latch = new CountDownLatch(regionNum -
bitSet.cardinality());
+
+ createRegionsReq
+ .getRegionMap()
+ .forEach(
+ (storageGroup, regionReplicaSet) -> {
+ // Enumerate each Region
+ regionReplicaSet
+ .getDataNodeLocations()
+ .forEach(
+ dataNodeLocation -> {
+ // Skip those created successfully
+ if (!bitSet.get(
+
indexMap.get(storageGroup).get(dataNodeLocation.getDataNodeId()))) {
+ TEndPoint endPoint =
dataNodeLocation.getInternalEndPoint();
+ CreateRegionHandler handler =
+ new CreateRegionHandler(
+ indexMap
+ .get(storageGroup)
+ .get(dataNodeLocation.getDataNodeId()),
+ bitSet,
+ latch,
+ regionReplicaSet.getRegionId(),
+ dataNodeLocation);
+
+ switch (regionReplicaSet.getRegionId().getType()) {
+ case SchemaRegion:
+ AsyncDataNodeClientPool.getInstance()
+ .createSchemaRegion(
+ endPoint,
+ genCreateSchemaRegionReq(storageGroup,
regionReplicaSet),
+ handler);
+ break;
+ case DataRegion:
+ AsyncDataNodeClientPool.getInstance()
+ .createDataRegion(
+ endPoint,
+ genCreateDataRegionReq(
+ storageGroup,
+ regionReplicaSet,
+ ttlMap.get(storageGroup)),
+ handler);
+ }
+ }
+ });
+ });
+
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ LOGGER.error("ClusterSchemaManager was interrupted during create
Regions on DataNodes", e);
+ }
+
+ if (bitSet.cardinality() == regionNum) {
+ break;
+ }
+ }
+
+ if (bitSet.cardinality() < regionNum) {
+ LOGGER.error(
+ "Failed to create some SchemaRegions or DataRegions on DataNodes.
Please check former logs.");
+ }
+ }
+
+ private TCreateSchemaRegionReq genCreateSchemaRegionReq(
+ String storageGroup, TRegionReplicaSet regionReplicaSet) {
+ TCreateSchemaRegionReq req = new TCreateSchemaRegionReq();
+ req.setStorageGroup(storageGroup);
+ req.setRegionReplicaSet(regionReplicaSet);
+ return req;
+ }
+
+ private TCreateDataRegionReq genCreateDataRegionReq(
+ String storageGroup, TRegionReplicaSet regionReplicaSet, long TTL) {
+ TCreateDataRegionReq req = new TCreateDataRegionReq();
+ req.setStorageGroup(storageGroup);
+ req.setRegionReplicaSet(regionReplicaSet);
+ req.setTtl(TTL);
+ return req;
+ }
+
+ private ConsensusManager getConsensusManager() {
+ return configManager.getConsensusManager();
+ }
+
+ private NodeManager getNodeManager() {
+ return configManager.getNodeManager();
+ }
+
+ private ClusterSchemaManager getClusterSchemaManager() {
+ return configManager.getClusterSchemaManager();
+ }
+
+ private PartitionManager getPartitionManager() {
+ return configManager.getPartitionManager();
+ }
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
index cf1d14103b..960d9cfbe4 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
@@ -57,7 +57,7 @@ public interface Manager {
*
* @return DataNodeManager instance
*/
- NodeManager getDataNodeManager();
+ NodeManager getNodeManager();
/**
* Get ConsensusManager
@@ -80,6 +80,13 @@ public interface Manager {
*/
PartitionManager getPartitionManager();
+ /**
+ * Get LoadManager
+ *
+ * @return LoadManager instance
+ */
+ LoadManager getLoadManager();
+
/**
* Register DataNode
*
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
index 47d3aa7d77..b5d778e8b5 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
@@ -42,7 +42,7 @@ import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
-/** Manage cluster node information and process node addition and removal
requests */
+/** NodeManager manages cluster node addition and removal requests */
public class NodeManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(NodeManager.class);
@@ -117,10 +117,25 @@ public class NodeManager {
return nodeInfo.getOnlineDataNodeCount();
}
+ /**
+ * Only leader use this interface.
+ *
+ * @return all online DataNodes
+ */
public List<TDataNodeLocation> getOnlineDataNodes() {
return nodeInfo.getOnlineDataNodes();
}
+ /**
+ * Only leader use this interface.
+ *
+ * @param dataNodeId the specific DataNodeId
+ * @return the specific DataNodeLocation
+ */
+ public TDataNodeLocation getOnlineDataNode(int dataNodeId) {
+ return nodeInfo.getOnlineDataNode(dataNodeId);
+ }
+
/**
* Provides ConfigNodeGroup information for the newly registered ConfigNode
*
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
index ce2716cc29..956cc3f507 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
@@ -18,8 +18,10 @@
*/
package org.apache.iotdb.confignode.manager;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
@@ -33,10 +35,11 @@ import
org.apache.iotdb.confignode.consensus.request.write.CreateDataPartitionRe
import
org.apache.iotdb.confignode.consensus.request.write.CreateSchemaPartitionReq;
import org.apache.iotdb.confignode.consensus.response.DataPartitionResp;
import org.apache.iotdb.confignode.consensus.response.SchemaPartitionResp;
-import org.apache.iotdb.confignode.persistence.ClusterSchemaInfo;
+import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
import org.apache.iotdb.confignode.persistence.PartitionInfo;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,27 +50,22 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
-/** manage data partition and schema partition */
+/** The PartitionManager Manages cluster PartitionTable read and write
requests. */
public class PartitionManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(PartitionManager.class);
- private static final ClusterSchemaInfo clusterSchemaInfo =
ClusterSchemaInfo.getInstance();
private static final PartitionInfo partitionInfo =
PartitionInfo.getInstance();
- private final Manager configNodeManager;
+ private final Manager configManager;
private SeriesPartitionExecutor executor;
- public PartitionManager(Manager configNodeManager) {
- this.configNodeManager = configNodeManager;
+ public PartitionManager(Manager configManager) {
+ this.configManager = configManager;
setSeriesPartitionExecutor();
}
- private ConsensusManager getConsensusManager() {
- return configNodeManager.getConsensusManager();
- }
-
/**
* Get SchemaPartition
*
@@ -85,13 +83,26 @@ public class PartitionManager {
* Get SchemaPartition and create a new one if it does not exist
*
* @param physicalPlan SchemaPartitionPlan with partitionSlotsMap
- * @return SchemaPartitionDataSet
+ * @return SchemaPartitionResp with DataPartition and TSStatus.
SUCCESS_STATUS if all process
+ * finish. NOT_ENOUGH_DATA_NODE if the DataNodes is not enough to create
new Regions.
*/
public DataSet getOrCreateSchemaPartition(GetOrCreateSchemaPartitionReq
physicalPlan) {
Map<String, List<TSeriesPartitionSlot>> noAssignedSchemaPartitionSlots =
partitionInfo.filterNoAssignedSchemaPartitionSlots(physicalPlan.getPartitionSlotsMap());
if (noAssignedSchemaPartitionSlots.size() > 0) {
+
+ // Make sure each StorageGroup has at least one SchemaRegion
+ try {
+ checkAndAllocateRegionsIfNecessary(
+ new ArrayList<>(noAssignedSchemaPartitionSlots.keySet()),
+ TConsensusGroupType.SchemaRegion);
+ } catch (NotEnoughDataNodeException e) {
+ SchemaPartitionResp resp = new SchemaPartitionResp();
+ resp.setStatus(new
TSStatus(TSStatusCode.NOT_ENOUGH_DATA_NODE.getStatusCode()));
+ return resp;
+ }
+
// Allocate SchemaPartition
Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>>
assignedSchemaPartition =
allocateSchemaPartition(noAssignedSchemaPartitionSlots);
@@ -100,6 +111,8 @@ public class PartitionManager {
CreateSchemaPartitionReq createPlan = new CreateSchemaPartitionReq();
createPlan.setAssignedSchemaPartition(assignedSchemaPartition);
getConsensusManager().write(createPlan);
+
+ // TODO: Allocate more Regions if necessary
}
return getSchemaPartition(physicalPlan);
@@ -120,7 +133,8 @@ public class PartitionManager {
noAssignedSchemaPartitionSlotsMap.get(storageGroup);
List<TRegionReplicaSet> schemaRegionReplicaSets =
partitionInfo.getRegionReplicaSets(
- clusterSchemaInfo.getRegionGroupIds(storageGroup,
TConsensusGroupType.SchemaRegion));
+ getClusterSchemaManager()
+ .getRegionGroupIds(storageGroup,
TConsensusGroupType.SchemaRegion));
Random random = new Random();
Map<TSeriesPartitionSlot, TRegionReplicaSet> allocateResult = new
HashMap<>();
@@ -155,13 +169,25 @@ public class PartitionManager {
*
* @param physicalPlan DataPartitionPlan with Map<StorageGroupName,
Map<SeriesPartitionSlot,
* List<TimePartitionSlot>>>
- * @return DataPartitionDataSet
+ * @return DataPartitionResp with DataPartition and TSStatus. SUCCESS_STATUS
if all process
+ * finish. NOT_ENOUGH_DATA_NODE if the DataNodes is not enough to create
new Regions.
*/
public DataSet getOrCreateDataPartition(GetOrCreateDataPartitionReq
physicalPlan) {
Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>>
noAssignedDataPartitionSlots =
partitionInfo.filterNoAssignedDataPartitionSlots(physicalPlan.getPartitionSlotsMap());
if (noAssignedDataPartitionSlots.size() > 0) {
+
+ // Make sure each StorageGroup has at least one DataRegion
+ try {
+ checkAndAllocateRegionsIfNecessary(
+ new ArrayList<>(noAssignedDataPartitionSlots.keySet()),
TConsensusGroupType.DataRegion);
+ } catch (NotEnoughDataNodeException e) {
+ DataPartitionResp resp = new DataPartitionResp();
+ resp.setStatus(new
TSStatus(TSStatusCode.NOT_ENOUGH_DATA_NODE.getStatusCode()));
+ return resp;
+ }
+
// Allocate DataPartition
Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot,
List<TRegionReplicaSet>>>>
assignedDataPartition =
allocateDataPartition(noAssignedDataPartitionSlots);
@@ -170,6 +196,8 @@ public class PartitionManager {
CreateDataPartitionReq createPlan = new CreateDataPartitionReq();
createPlan.setAssignedDataPartition(assignedDataPartition);
getConsensusManager().write(createPlan);
+
+ // TODO: Allocate more Regions if necessary
}
return getDataPartition(physicalPlan);
@@ -196,7 +224,8 @@ public class PartitionManager {
noAssignedDataPartitionSlotsMap.get(storageGroup);
List<TRegionReplicaSet> dataRegionEndPoints =
partitionInfo.getRegionReplicaSets(
- clusterSchemaInfo.getRegionGroupIds(storageGroup,
TConsensusGroupType.DataRegion));
+ getClusterSchemaManager()
+ .getRegionGroupIds(storageGroup,
TConsensusGroupType.DataRegion));
Random random = new Random();
Map<TSeriesPartitionSlot, Map<TTimePartitionSlot,
List<TRegionReplicaSet>>> allocateResult =
@@ -217,6 +246,25 @@ public class PartitionManager {
return result;
}
+ private void checkAndAllocateRegionsIfNecessary(
+ List<String> storageGroups, TConsensusGroupType consensusGroupType)
+ throws NotEnoughDataNodeException {
+ List<String> storageGroupWithoutRegion = new ArrayList<>();
+ for (String storageGroup : storageGroups) {
+ List<TConsensusGroupId> groupIds =
+ getClusterSchemaManager().getRegionGroupIds(storageGroup,
consensusGroupType);
+ if (groupIds.size() == 0) {
+ storageGroupWithoutRegion.add(storageGroup);
+ }
+ }
+ getLoadManager().allocateAndCreateRegions(storageGroupWithoutRegion,
consensusGroupType);
+ }
+
+ /** Get all allocated RegionReplicaSets */
+ public List<TRegionReplicaSet> getAllocatedRegions() {
+ return partitionInfo.getAllocatedRegions();
+ }
+
/** Construct SeriesPartitionExecutor by iotdb-confignode.propertis */
private void setSeriesPartitionExecutor() {
ConfigNodeConf conf = ConfigNodeDescriptor.getInstance().getConf();
@@ -234,4 +282,25 @@ public class PartitionManager {
public TSeriesPartitionSlot getSeriesPartitionSlot(String devicePath) {
return executor.getSeriesPartitionSlot(devicePath);
}
+
+ /**
+ * Only leader use this interface.
+ *
+ * @return the next RegionGroupId
+ */
+ public int generateNextRegionGroupId() {
+ return partitionInfo.generateNextRegionGroupId();
+ }
+
+ private ConsensusManager getConsensusManager() {
+ return configManager.getConsensusManager();
+ }
+
+ private ClusterSchemaManager getClusterSchemaManager() {
+ return configManager.getClusterSchemaManager();
+ }
+
+ private LoadManager getLoadManager() {
+ return configManager.getLoadManager();
+ }
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/allocator/CopySetRegionAllocator.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/allocator/CopySetRegionAllocator.java
new file mode 100644
index 0000000000..00da9dc05f
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/allocator/CopySetRegionAllocator.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager.allocator;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Allocate Region by CopySet algorithm. Reference:
+ *
https://www.usenix.org/conference/atc13/technical-sessions/presentation/cidon
+ */
+public class CopySetRegionAllocator implements IRegionAllocator {
+
+ private static final int maximumRandomNum = 10;
+
+ private int maxId = 0;
+ private int intersectionSize = 0;
+ private List<TDataNodeLocation> weightList;
+
+ public CopySetRegionAllocator() {
+ // Empty constructor
+ }
+
+ @Override
+ public TRegionReplicaSet allocateRegion(
+ List<TDataNodeLocation> onlineDataNodes,
+ List<TRegionReplicaSet> allocatedRegions,
+ int replicationFactor,
+ TConsensusGroupId consensusGroupId) {
+ TRegionReplicaSet result = null;
+
+ // Build weightList for weighted random
+ buildWeightList(onlineDataNodes, allocatedRegions);
+
+ boolean accepted = false;
+ while (true) {
+ for (int retry = 0; retry < maximumRandomNum; retry++) {
+ result = genWeightedRandomRegion(replicationFactor);
+ if (intersectionCheck(allocatedRegions, result)) {
+ accepted = true;
+ break;
+ }
+ }
+ if (accepted) {
+ break;
+ }
+ intersectionSize += 1;
+ }
+
+ clear();
+ result.setRegionId(consensusGroupId);
+ return result;
+ }
+
+ private void buildWeightList(
+ List<TDataNodeLocation> onlineDataNodes, List<TRegionReplicaSet>
allocatedRegions) {
+ int maximumRegionNum = 0;
+ Map<TDataNodeLocation, Integer> countMap = new HashMap<>();
+ for (TDataNodeLocation dataNodeLocation : onlineDataNodes) {
+ maxId = Math.max(maxId, dataNodeLocation.getDataNodeId());
+ countMap.put(dataNodeLocation, 0);
+ }
+ for (TRegionReplicaSet regionReplicaSet : allocatedRegions) {
+ for (TDataNodeLocation dataNodeLocation :
regionReplicaSet.getDataNodeLocations()) {
+ countMap.computeIfPresent(dataNodeLocation, (dataNode, count) ->
(count + 1));
+ maximumRegionNum = Math.max(maximumRegionNum,
countMap.get(dataNodeLocation));
+ }
+ }
+
+ weightList = new ArrayList<>();
+ for (Map.Entry<TDataNodeLocation, Integer> countEntry :
countMap.entrySet()) {
+ int weight = maximumRegionNum - countEntry.getValue() + 1;
+ // Repeatedly add DataNode copies equal to the number of their weights
+ for (int repeat = 0; repeat < weight; repeat++) {
+ weightList.add(countEntry.getKey().deepCopy());
+ }
+ }
+ }
+
+ private TRegionReplicaSet genWeightedRandomRegion(int replicationFactor) {
+ Set<Integer> checkSet = new HashSet<>();
+ TRegionReplicaSet randomRegion = new TRegionReplicaSet();
+ Collections.shuffle(weightList);
+
+ for (TDataNodeLocation dataNodeLocation : weightList) {
+ if (checkSet.contains(dataNodeLocation.getDataNodeId())) {
+ continue;
+ }
+
+ checkSet.add(dataNodeLocation.getDataNodeId());
+ randomRegion.addToDataNodeLocations(dataNodeLocation);
+
+ if (randomRegion.getDataNodeLocationsSize() == replicationFactor) {
+ break;
+ }
+ }
+
+ return randomRegion;
+ }
+
+ private boolean intersectionCheck(
+ List<TRegionReplicaSet> allocatedRegions, TRegionReplicaSet newRegion) {
+ BitSet newBit = new BitSet(maxId + 1);
+ for (TDataNodeLocation dataNodeLocation :
newRegion.getDataNodeLocations()) {
+ newBit.set(dataNodeLocation.getDataNodeId());
+ }
+
+ for (TRegionReplicaSet allocatedRegion : allocatedRegions) {
+ BitSet allocatedBit = new BitSet(maxId + 1);
+ for (TDataNodeLocation dataNodeLocation :
allocatedRegion.getDataNodeLocations()) {
+ allocatedBit.set(dataNodeLocation.getDataNodeId());
+ }
+
+ allocatedBit.and(newBit);
+ if (allocatedBit.cardinality() > intersectionSize) {
+ // In order to ensure the maximum scatter width and the minimum
disaster rate
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private void clear() {
+ maxId = 0;
+ intersectionSize = 0;
+ weightList.clear();
+ weightList = null;
+ }
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/allocator/IRegionAllocator.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/allocator/IRegionAllocator.java
new file mode 100644
index 0000000000..4874f5a18c
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/allocator/IRegionAllocator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager.allocator;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+
+import java.util.List;
+
+public interface IRegionAllocator {
+
+ /**
+ * Calculating the next optimal TRegionReplicaSet based on the current
online DataNodes and
+ * allocated Regions
+ *
+ * @param onlineDataNodes DataNodes that currently communicable
+ * @param allocatedRegions Allocated Regions
+ * @param replicationFactor Replication factor of TRegionReplicaSet
+ * @param consensusGroupId TConsensusGroupId of result TRegionReplicaSet
+ * @return The optimal TRegionReplicaSet derived by the specific algorithm
+ */
+ TRegionReplicaSet allocateRegion(
+ List<TDataNodeLocation> onlineDataNodes,
+ List<TRegionReplicaSet> allocatedRegions,
+ int replicationFactor,
+ TConsensusGroupId consensusGroupId);
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/LoadManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/balancer/RegionBalancer.java
similarity index 68%
copy from
confignode/src/main/java/org/apache/iotdb/confignode/manager/LoadManager.java
copy to
confignode/src/main/java/org/apache/iotdb/confignode/manager/balancer/RegionBalancer.java
index 39bf58b28d..bf7e24527b 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/LoadManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/balancer/RegionBalancer.java
@@ -16,19 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager;
+package org.apache.iotdb.confignode.manager.balancer;
-/**
- * The LoadBalancer at ConfigNodeGroup-Leader is active for cluster dynamic
load balancing
- * scheduling
- */
-public class LoadManager {
-
- private void metadataLoadBalance() {}
-
- private void dataLoadBalance() {}
-
- private void cleanDataSlice() {}
-
- // TODO: Interfaces for active, interrupt and reset LoadBalancer
-}
+public class RegionBalancer {}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/LoadManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/balancer/SeriesPartitionSlotBalancer.java
similarity index 68%
copy from
confignode/src/main/java/org/apache/iotdb/confignode/manager/LoadManager.java
copy to
confignode/src/main/java/org/apache/iotdb/confignode/manager/balancer/SeriesPartitionSlotBalancer.java
index 39bf58b28d..441f732264 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/LoadManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/balancer/SeriesPartitionSlotBalancer.java
@@ -16,19 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager;
+package org.apache.iotdb.confignode.manager.balancer;
-/**
- * The LoadBalancer at ConfigNodeGroup-Leader is active for cluster dynamic
load balancing
- * scheduling
- */
-public class LoadManager {
-
- private void metadataLoadBalance() {}
-
- private void dataLoadBalance() {}
-
- private void cleanDataSlice() {}
-
- // TODO: Interfaces for active, interrupt and reset LoadBalancer
-}
+public class SeriesPartitionSlotBalancer {}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ClusterSchemaInfo.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ClusterSchemaInfo.java
index 3a58068ef1..da44a8d325 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ClusterSchemaInfo.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ClusterSchemaInfo.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.confignode.persistence;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
@@ -27,6 +28,7 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.consensus.request.read.CountStorageGroupReq;
import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupReq;
+import org.apache.iotdb.confignode.consensus.request.write.CreateRegionsReq;
import
org.apache.iotdb.confignode.consensus.request.write.DeleteStorageGroupReq;
import
org.apache.iotdb.confignode.consensus.request.write.SetDataReplicationFactorReq;
import
org.apache.iotdb.confignode.consensus.request.write.SetSchemaReplicationFactorReq;
@@ -56,6 +58,10 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+/**
+ * The ClusterSchemaInfo stores cluster schema. The cluster schema including:
1. StorageGroupSchema
+ * 2. Template (Not implement yet)
+ */
public class ClusterSchemaInfo implements SnapshotProcessor {
private static final Logger LOGGER =
LoggerFactory.getLogger(ClusterSchemaInfo.class);
@@ -84,7 +90,8 @@ public class ClusterSchemaInfo implements SnapshotProcessor {
* Persistence new StorageGroupSchema
*
* @param req SetStorageGroupReq
- * @return SUCCESS_STATUS
+ * @return SUCCESS_STATUS if the StorageGroup is set successfully.
PERSISTENCE_FAILURE if fail to
+ * set StorageGroup in MTreeAboveSG.
*/
public TSStatus setStorageGroup(SetStorageGroupReq req) {
TSStatus result = new TSStatus();
@@ -106,7 +113,7 @@ public class ClusterSchemaInfo implements SnapshotProcessor
{
} catch (MetadataException e) {
LOGGER.error("Error StorageGroup name", e);
result
- .setCode(TSStatusCode.STORAGE_GROUP_NOT_EXIST.getStatusCode())
+ .setCode(TSStatusCode.PERSISTENCE_FAILURE.getStatusCode())
.setMessage("Error StorageGroup name");
} finally {
storageGroupReadWriteLock.writeLock().unlock();
@@ -140,6 +147,42 @@ public class ClusterSchemaInfo implements
SnapshotProcessor {
return result;
}
+ /**
+ * Persistence new RegionGroupIds on specific StorageGroupSchema
+ *
+ * @param req CreateRegionsReq
+ * @return SUCCESS_STATUS if the new RegionGroupIds is persistence
successfully.
+ * PERSISTENCE_FAILURE if fail to find StorageGroup in MTreeAboveSG.
+ */
+ public TSStatus createRegions(CreateRegionsReq req) {
+ TSStatus result = new TSStatus();
+ storageGroupReadWriteLock.writeLock().lock();
+
+ try {
+ for (Map.Entry<String, TRegionReplicaSet> reqEntry :
req.getRegionMap().entrySet()) {
+ PartialPath partialPathName = new PartialPath(reqEntry.getKey());
+ TStorageGroupSchema storageGroupSchema =
+
mTree.getStorageGroupNodeByStorageGroupPath(partialPathName).getStorageGroupSchema();
+ switch (reqEntry.getValue().getRegionId().getType()) {
+ case SchemaRegion:
+
storageGroupSchema.getSchemaRegionGroupIds().add(reqEntry.getValue().getRegionId());
+ break;
+ case DataRegion:
+
storageGroupSchema.getDataRegionGroupIds().add(reqEntry.getValue().getRegionId());
+ break;
+ }
+ }
+ result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ } catch (MetadataException e) {
+ LOGGER.error("Error StorageGroup name", e);
+ result.setCode(TSStatusCode.PERSISTENCE_FAILURE.getStatusCode());
+ } finally {
+ storageGroupReadWriteLock.writeLock().unlock();
+ }
+
+ return result;
+ }
+
public TSStatus setTTL(SetTTLReq req) {
TSStatus result = new TSStatus();
storageGroupReadWriteLock.writeLock().lock();
@@ -316,7 +359,26 @@ public class ClusterSchemaInfo implements
SnapshotProcessor {
}
/**
- * Get the SchemaRegionGroupIds or DataRegionGroupIds from the specific
StorageGroup
+ * Get the specific StorageGroupSchema
+ *
+ * @param storageGroup StorageGroupName
+ * @return The specific StorageGroupSchema
+ * @throws MetadataException from MTree
+ */
+ public TStorageGroupSchema getMatchedStorageGroupSchemaByName(String
storageGroup)
+ throws MetadataException {
+ storageGroupReadWriteLock.readLock().lock();
+ try {
+ return mTree
+ .getStorageGroupNodeByStorageGroupPath(new PartialPath(storageGroup))
+ .getStorageGroupSchema();
+ } finally {
+ storageGroupReadWriteLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Get the SchemaRegionGroupIds or DataRegionGroupIds from the specific
StorageGroup.
*
* @param storageGroup StorageGroupName
* @param type SchemaRegion or DataRegion
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
index 49f0a9af0f..38af6e207f 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
@@ -52,6 +52,10 @@ import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+/**
+ * The NodeInfo stores cluster node information. The cluster node information
including: 1. DataNode
+ * information 2. ConfigNode information
+ */
public class NodeInfo {
private static final Logger LOGGER = LoggerFactory.getLogger(NodeInfo.class);
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfo.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfo.java
index 2d41b86329..5e61c3133c 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfo.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfo.java
@@ -63,7 +63,11 @@ import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-/** manage data partition and schema partition */
+/**
+ * The PartitionInfo stores cluster PartitionTable. The PartitionTable
including: 1. regionMap:
+ * location of Region member 2. schemaPartition: location of schema 3.
dataPartition: location of
+ * data
+ */
public class PartitionInfo implements SnapshotProcessor {
private static final Logger LOGGER =
LoggerFactory.getLogger(PartitionInfo.class);
@@ -125,7 +129,7 @@ public class PartitionInfo implements SnapshotProcessor {
try {
int maxRegionId = Integer.MIN_VALUE;
- for (TRegionReplicaSet regionReplicaSet : req.getRegionReplicaSets()) {
+ for (TRegionReplicaSet regionReplicaSet : req.getRegionMap().values()) {
regionMap.put(regionReplicaSet.getRegionId(), regionReplicaSet);
maxRegionId = Math.max(maxRegionId,
regionReplicaSet.getRegionId().getId());
}
@@ -133,7 +137,7 @@ public class PartitionInfo implements SnapshotProcessor {
if (nextRegionGroupId.get() < maxRegionId) {
// In this case, at least one Region is created with the leader node,
// so the nextRegionGroupID of the followers needs to be added
- nextRegionGroupId.getAndAdd(req.getRegionReplicaSets().size());
+ nextRegionGroupId.getAndAdd(req.getRegionMap().size());
}
result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
@@ -321,6 +325,18 @@ public class PartitionInfo implements SnapshotProcessor {
return result;
}
+ /** Get all allocated RegionReplicaSets */
+ public List<TRegionReplicaSet> getAllocatedRegions() {
+ List<TRegionReplicaSet> result;
+ regionReadWriteLock.readLock().lock();
+ try {
+ result = new ArrayList<>(regionMap.values());
+ } finally {
+ regionReadWriteLock.readLock().unlock();
+ }
+ return result;
+ }
+
public boolean processTakeSnapshot(File snapshotDir) throws TException,
IOException {
File snapshotFile = new File(snapshotDir, snapshotFileName);
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
index 5d5a115139..2eb9a61392 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
@@ -46,6 +46,7 @@ import org.apache.iotdb.confignode.persistence.NodeInfo;
import org.apache.iotdb.confignode.persistence.PartitionInfo;
import org.apache.iotdb.confignode.persistence.SnapshotProcessor;
import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.TException;
import org.slf4j.Logger;
@@ -126,6 +127,10 @@ public class ConfigRequestExecutor {
case SetTimePartitionInterval:
return
clusterSchemaInfo.setTimePartitionInterval((SetTimePartitionIntervalReq) req);
case CreateRegions:
+ TSStatus status = clusterSchemaInfo.createRegions((CreateRegionsReq)
req);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return status;
+ }
return partitionInfo.createRegions((CreateRegionsReq) req);
case DeleteRegions:
return partitionInfo.deleteRegions((DeleteRegionsReq) req);
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 6b62898103..0ee2212685 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -143,7 +143,16 @@ public class ConfigNodeRPCServiceProcessor implements
ConfigIService.Iface {
storageGroupSchema.setTimePartitionInterval(
ConfigNodeDescriptor.getInstance().getConf().getTimePartitionInterval());
}
+ if (!storageGroupSchema.isSetMaximumSchemaRegionCount()) {
+ storageGroupSchema.setMaximumSchemaRegionCount(
+
ConfigNodeDescriptor.getInstance().getConf().getMaximumSchemaRegionCount());
+ }
+ if (!storageGroupSchema.isSetMaximumDataRegionCount()) {
+ storageGroupSchema.setMaximumDataRegionCount(
+
ConfigNodeDescriptor.getInstance().getConf().getMaximumDataRegionCount());
+ }
+ // Initialize RegionGroupId List
storageGroupSchema.setSchemaRegionGroupIds(new ArrayList<>());
storageGroupSchema.setDataRegionGroupIds(new ArrayList<>());
diff --git
a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestSerDeTest.java
b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestSerDeTest.java
index a1f104697a..4a0b56262e 100644
---
a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestSerDeTest.java
+++
b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestSerDeTest.java
@@ -200,12 +200,12 @@ public class ConfigRequestSerDeTest {
TRegionReplicaSet dataRegionSet = new TRegionReplicaSet();
dataRegionSet.setRegionId(new
TConsensusGroupId(TConsensusGroupType.DataRegion, 0));
dataRegionSet.setDataNodeLocations(Collections.singletonList(dataNodeLocation));
- req0.addRegion(dataRegionSet);
+ req0.addRegion("root.sg0", dataRegionSet);
TRegionReplicaSet schemaRegionSet = new TRegionReplicaSet();
schemaRegionSet.setRegionId(new
TConsensusGroupId(TConsensusGroupType.SchemaRegion, 1));
schemaRegionSet.setDataNodeLocations(Collections.singletonList(dataNodeLocation));
- req0.addRegion(schemaRegionSet);
+ req0.addRegion("root.sg1", schemaRegionSet);
req0.serialize(buffer);
buffer.flip();
diff --git
a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
index ffbd52aa6b..6e120afd86 100644
---
a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
+++
b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
@@ -95,7 +95,7 @@ public class PartitionInfoTest {
generateTRegionReplicaSet(
testFlag.RegionReplica.getFlag(),
generateTConsensusGroupId(testFlag.RegionReplica.getFlag()));
- createRegionsReq.addRegion(tRegionReplicaSet);
+ createRegionsReq.addRegion("root.test", tRegionReplicaSet);
partitionInfo.createRegions(createRegionsReq);
CreateSchemaPartitionReq createSchemaPartitionReq =
diff --git
a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
index fa35800609..6396c9f7d2 100644
---
a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
+++
b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
@@ -193,16 +193,11 @@ public class ConfigNodeRPCServiceProcessorTest {
final String sg0 = "root.sg0";
final String sg1 = "root.sg1";
- // failed because there are not enough DataNodes
- TSetStorageGroupReq setReq0 = new TSetStorageGroupReq(new
TStorageGroupSchema(sg0));
- status = processor.setStorageGroup(setReq0);
- Assert.assertEquals(TSStatusCode.NOT_ENOUGH_DATA_NODE.getStatusCode(),
status.getCode());
- Assert.assertEquals("DataNode is not enough, please register more.",
status.getMessage());
-
// register DataNodes
registerDataNodes();
// set StorageGroup0 by default values
+ TSetStorageGroupReq setReq0 = new TSetStorageGroupReq(new
TStorageGroupSchema(sg0));
status = processor.setStorageGroup(setReq0);
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
index 330ea703c5..0bc5556c15 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
@@ -85,4 +85,24 @@ public abstract class ConsensusGroupId {
public static TConsensusGroupId convertToTConsensusGroupId(ConsensusGroupId
consensusGroupId) {
return new TConsensusGroupId(consensusGroupId.getType(),
consensusGroupId.getId());
}
+
+ public static String formatTConsensusGroupId(TConsensusGroupId groupId) {
+ StringBuilder format = new StringBuilder();
+
+ switch (groupId.getType()) {
+ case SchemaRegion:
+ format.append("SchemaRegion");
+ break;
+ case DataRegion:
+ format.append("DataRegion");
+ break;
+ case PartitionRegion:
+ format.append("PartitionRegion");
+ break;
+ }
+
+ format.append("(").append(groupId.getId()).append(")");
+
+ return format.toString();
+ }
}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index f7615c8983..f612f6732b 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -123,7 +123,8 @@ public enum TSStatusCode {
STORAGE_GROUP_ALREADY_EXISTS(902),
NOT_ENOUGH_DATA_NODE(903),
ERROR_GLOBAL_CONFIG(904),
- APPLY_CONFIGNODE_FAILED(905);
+ APPLY_CONFIGNODE_FAILED(905),
+ PERSISTENCE_FAILURE(906);
private int statusCode;
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift
b/thrift-confignode/src/main/thrift/confignode.thrift
index a61e425391..07642289d9 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -96,8 +96,10 @@ struct TStorageGroupSchema {
3: optional i32 schemaReplicationFactor
4: optional i32 dataReplicationFactor
5: optional i64 timePartitionInterval
- 6: optional list<common.TConsensusGroupId> dataRegionGroupIds
- 7: optional list<common.TConsensusGroupId> schemaRegionGroupIds
+ 6: optional i32 maximumSchemaRegionCount
+ 7: optional i32 maximumDataRegionCount
+ 8: optional list<common.TConsensusGroupId> dataRegionGroupIds
+ 9: optional list<common.TConsensusGroupId> schemaRegionGroupIds
}
// Schema