This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a2436515e0 [IOTDB-3428] Linear expanse RegionGroup(simple version)
(#6325)
a2436515e0 is described below
commit a2436515e0820e1c8304595e88a0280fb89fbe23
Author: YongzaoDan <[email protected]>
AuthorDate: Thu Jun 23 18:22:33 2022 +0800
[IOTDB-3428] Linear expanse RegionGroup(simple version) (#6325)
---
.../confignode/client/AsyncDataNodeClientPool.java | 10 +-
.../consensus/request/ConfigRequest.java | 6 +-
.../consensus/request/ConfigRequestType.java | 3 +-
.../write/AdjustMaxRegionGroupCountReq.java | 89 +++++++++
.../consensus/request/write/CreateRegionsReq.java | 28 +--
.../confignode/manager/ClusterSchemaManager.java | 147 +++++++++++---
.../iotdb/confignode/manager/NodeManager.java | 7 +
.../iotdb/confignode/manager/PartitionManager.java | 222 +++++++++++++--------
.../iotdb/confignode/manager/load/LoadManager.java | 28 ++-
.../manager/load/balancer/RegionBalancer.java | 19 +-
.../confignode/persistence/ClusterSchemaInfo.java | 171 +++++++++++-----
.../executor/ConfigRequestExecutor.java | 7 +-
.../persistence/partition/PartitionInfo.java | 39 +++-
.../persistence/partition/RegionGroup.java | 11 +-
.../partition/StorageGroupPartitionTable.java | 42 +++-
.../thrift/ConfigNodeRPCServiceProcessor.java | 6 +-
.../consensus/request/ConfigRequestSerDeTest.java | 18 +-
.../confignode/persistence/PartitionInfoTest.java | 20 +-
.../src/main/thrift/confignode.thrift | 4 +-
19 files changed, 636 insertions(+), 241 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
b/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
index a63d4367b5..9ecab37d8a 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
@@ -64,10 +64,10 @@ public class AsyncDataNodeClientPool {
/**
* Execute CreateRegionsReq asynchronously
*
- * @param createRegionsReq CreateRegionsReq
+ * @param createRegionGroupsReq CreateRegionsReq
* @param ttlMap Map<StorageGroupName, TTL>
*/
- public void createRegions(CreateRegionsReq createRegionsReq, Map<String,
Long> ttlMap) {
+ public void createRegions(CreateRegionsReq createRegionGroupsReq,
Map<String, Long> ttlMap) {
// Index of each Region
int index = 0;
@@ -78,7 +78,7 @@ public class AsyncDataNodeClientPool {
// Assign an independent index to each Region
for (Map.Entry<String, List<TRegionReplicaSet>> entry :
- createRegionsReq.getRegionMap().entrySet()) {
+ createRegionGroupsReq.getRegionGroupMap().entrySet()) {
for (TRegionReplicaSet regionReplicaSet : entry.getValue()) {
regionNum += regionReplicaSet.getDataNodeLocationsSize();
for (TDataNodeLocation dataNodeLocation :
regionReplicaSet.getDataNodeLocations()) {
@@ -93,8 +93,8 @@ public class AsyncDataNodeClientPool {
BitSet bitSet = new BitSet(regionNum);
for (int retry = 0; retry < 3; retry++) {
CountDownLatch latch = new CountDownLatch(regionNum -
bitSet.cardinality());
- createRegionsReq
- .getRegionMap()
+ createRegionGroupsReq
+ .getRegionGroupMap()
.forEach(
(storageGroup, regionReplicaSets) -> {
// Enumerate each RegionReplicaSet
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequest.java
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequest.java
index 02cb72d761..ec1e50e793 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequest.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequest.java
@@ -28,6 +28,7 @@ import
org.apache.iotdb.confignode.consensus.request.read.GetOrCreateSchemaParti
import
org.apache.iotdb.confignode.consensus.request.read.GetRegionLocationsReq;
import
org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionReq;
import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupReq;
+import
org.apache.iotdb.confignode.consensus.request.write.AdjustMaxRegionGroupCountReq;
import org.apache.iotdb.confignode.consensus.request.write.ApplyConfigNodeReq;
import
org.apache.iotdb.confignode.consensus.request.write.CreateDataPartitionReq;
import org.apache.iotdb.confignode.consensus.request.write.CreateFunctionReq;
@@ -118,13 +119,16 @@ public abstract class ConfigRequest implements
IConsensusRequest {
case SetTimePartitionInterval:
req = new SetTimePartitionIntervalReq();
break;
+ case AdjustMaxRegionGroupCount:
+ req = new AdjustMaxRegionGroupCountReq();
+ break;
case CountStorageGroup:
req = new CountStorageGroupReq();
break;
case GetStorageGroup:
req = new GetStorageGroupReq();
break;
- case CreateRegions:
+ case CreateRegionGroups:
req = new CreateRegionsReq();
break;
case DeleteRegions:
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java
index 9deadb7165..6197a6e425 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java
@@ -26,11 +26,12 @@ public enum ConfigRequestType {
SetSchemaReplicationFactor,
SetDataReplicationFactor,
SetTimePartitionInterval,
+ AdjustMaxRegionGroupCount,
DeleteStorageGroup,
PreDeleteStorageGroup,
GetStorageGroup,
CountStorageGroup,
- CreateRegions,
+ CreateRegionGroups,
DeleteRegions,
GetSchemaPartition,
CreateSchemaPartition,
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/AdjustMaxRegionGroupCountReq.java
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/AdjustMaxRegionGroupCountReq.java
new file mode 100644
index 0000000000..d8c3b92010
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/AdjustMaxRegionGroupCountReq.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.consensus.request.write;
+
+import org.apache.iotdb.confignode.consensus.request.ConfigRequest;
+import org.apache.iotdb.confignode.consensus.request.ConfigRequestType;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+public class AdjustMaxRegionGroupCountReq extends ConfigRequest {
+
+ // Map<StorageGroupName, Pair<maxSchemaRegionGroupCount,
maxDataRegionGroupCount>>
+ public final Map<String, Pair<Integer, Integer>> maxRegionGroupCountMap;
+
+ public AdjustMaxRegionGroupCountReq() {
+ super(ConfigRequestType.AdjustMaxRegionGroupCount);
+ this.maxRegionGroupCountMap = new HashMap<>();
+ }
+
+ public void putEntry(String storageGroup, Pair<Integer, Integer>
maxRegionGroupCount) {
+ maxRegionGroupCountMap.put(storageGroup, maxRegionGroupCount);
+ }
+
+ public Map<String, Pair<Integer, Integer>> getMaxRegionGroupCountMap() {
+ return maxRegionGroupCountMap;
+ }
+
+ @Override
+ protected void serializeImpl(DataOutputStream stream) throws IOException {
+
ReadWriteIOUtils.write(ConfigRequestType.AdjustMaxRegionGroupCount.ordinal(),
stream);
+
+ ReadWriteIOUtils.write(maxRegionGroupCountMap.size(), stream);
+ for (Map.Entry<String, Pair<Integer, Integer>> maxRegionGroupCountEntry :
+ maxRegionGroupCountMap.entrySet()) {
+ ReadWriteIOUtils.write(maxRegionGroupCountEntry.getKey(), stream);
+ ReadWriteIOUtils.write(maxRegionGroupCountEntry.getValue().getLeft(),
stream);
+ ReadWriteIOUtils.write(maxRegionGroupCountEntry.getValue().getRight(),
stream);
+ }
+ }
+
+ @Override
+ protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+ int storageGroupNum = buffer.getInt();
+
+ for (int i = 0; i < storageGroupNum; i++) {
+ String storageGroup = ReadWriteIOUtils.readString(buffer);
+ int maxSchemaRegionGroupCount = buffer.getInt();
+ int maxDataRegionGroupCount = buffer.getInt();
+ maxRegionGroupCountMap.put(
+ storageGroup, new Pair<>(maxSchemaRegionGroupCount,
maxDataRegionGroupCount));
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ AdjustMaxRegionGroupCountReq that = (AdjustMaxRegionGroupCountReq) o;
+ return maxRegionGroupCountMap.equals(that.maxRegionGroupCountMap);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(maxRegionGroupCountMap);
+ }
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateRegionsReq.java
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateRegionsReq.java
index 7c9cb3e16a..25abedc11f 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateRegionsReq.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateRegionsReq.java
@@ -38,29 +38,29 @@ import java.util.TreeMap;
public class CreateRegionsReq extends ConfigRequest {
// Map<StorageGroupName, List<TRegionReplicaSet>>
- private final Map<String, List<TRegionReplicaSet>> regionMap;
+ private final Map<String, List<TRegionReplicaSet>> regionGroupMap;
public CreateRegionsReq() {
- super(ConfigRequestType.CreateRegions);
- this.regionMap = new TreeMap<>();
+ super(ConfigRequestType.CreateRegionGroups);
+ this.regionGroupMap = new TreeMap<>();
}
- public Map<String, List<TRegionReplicaSet>> getRegionMap() {
- return regionMap;
+ public Map<String, List<TRegionReplicaSet>> getRegionGroupMap() {
+ return regionGroupMap;
}
- public void addRegion(String storageGroup, TRegionReplicaSet
regionReplicaSet) {
- regionMap
+ public void addRegionGroup(String storageGroup, TRegionReplicaSet
regionReplicaSet) {
+ regionGroupMap
.computeIfAbsent(storageGroup, regionReplicaSets -> new ArrayList<>())
.add(regionReplicaSet);
}
@Override
protected void serializeImpl(DataOutputStream stream) throws IOException {
- stream.writeInt(ConfigRequestType.CreateRegions.ordinal());
+ stream.writeInt(ConfigRequestType.CreateRegionGroups.ordinal());
- stream.writeInt(regionMap.size());
- for (Entry<String, List<TRegionReplicaSet>> entry : regionMap.entrySet()) {
+ stream.writeInt(regionGroupMap.size());
+ for (Entry<String, List<TRegionReplicaSet>> entry :
regionGroupMap.entrySet()) {
String storageGroup = entry.getKey();
List<TRegionReplicaSet> regionReplicaSets = entry.getValue();
BasicStructureSerDeUtil.write(storageGroup, stream);
@@ -76,13 +76,13 @@ public class CreateRegionsReq extends ConfigRequest {
int storageGroupNum = buffer.getInt();
for (int i = 0; i < storageGroupNum; i++) {
String storageGroup = BasicStructureSerDeUtil.readString(buffer);
- regionMap.put(storageGroup, new ArrayList<>());
+ regionGroupMap.put(storageGroup, new ArrayList<>());
int regionReplicaSetNum = buffer.getInt();
for (int j = 0; j < regionReplicaSetNum; j++) {
TRegionReplicaSet regionReplicaSet =
ThriftCommonsSerDeUtils.deserializeTRegionReplicaSet(buffer);
- regionMap.get(storageGroup).add(regionReplicaSet);
+ regionGroupMap.get(storageGroup).add(regionReplicaSet);
}
}
}
@@ -92,11 +92,11 @@ public class CreateRegionsReq extends ConfigRequest {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CreateRegionsReq that = (CreateRegionsReq) o;
- return regionMap.equals(that.regionMap);
+ return regionGroupMap.equals(that.regionGroupMap);
}
@Override
public int hashCode() {
- return Objects.hash(regionMap);
+ return Objects.hash(regionGroupMap);
}
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
index 0ca17a5e86..e50347c1ae 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
@@ -18,24 +18,25 @@
*/
package org.apache.iotdb.confignode.manager;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.confignode.consensus.request.read.CountStorageGroupReq;
import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupReq;
+import
org.apache.iotdb.confignode.consensus.request.write.AdjustMaxRegionGroupCountReq;
import
org.apache.iotdb.confignode.consensus.request.write.DeleteStorageGroupReq;
import
org.apache.iotdb.confignode.consensus.request.write.SetDataReplicationFactorReq;
import
org.apache.iotdb.confignode.consensus.request.write.SetSchemaReplicationFactorReq;
import org.apache.iotdb.confignode.consensus.request.write.SetStorageGroupReq;
import org.apache.iotdb.confignode.consensus.request.write.SetTTLReq;
import
org.apache.iotdb.confignode.consensus.request.write.SetTimePartitionIntervalReq;
-import org.apache.iotdb.confignode.consensus.response.CountStorageGroupResp;
-import org.apache.iotdb.confignode.consensus.response.StorageGroupSchemaResp;
import org.apache.iotdb.confignode.exception.StorageGroupNotExistsException;
import org.apache.iotdb.confignode.persistence.ClusterSchemaInfo;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
-import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
+import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,6 +57,10 @@ public class ClusterSchemaManager {
this.clusterSchemaInfo = clusterSchemaInfo;
}
+ // ======================================================
+ // Consensus read/write interfaces
+ // ======================================================
+
/**
* Set StorageGroup
*
@@ -77,36 +82,38 @@ public class ClusterSchemaManager {
result.setMessage(metadataException.getMessage());
return result;
}
- // Persist StorageGroupSchema
+
+ // Cache StorageGroupSchema
result = getConsensusManager().write(setStorageGroupReq).getStatus();
+
+ // Adjust the maximum RegionGroup number of each StorageGroup
+ adjustMaxRegionGroupCount();
+
return result;
}
public TSStatus deleteStorageGroup(DeleteStorageGroupReq
deleteStorageGroupReq) {
+ // Adjust the maximum RegionGroup number of each StorageGroup
+ adjustMaxRegionGroupCount();
return getConsensusManager().write(deleteStorageGroupReq).getStatus();
}
/**
- * Only leader use this interface.
+ * Count StorageGroups by specific path pattern
*
- * @param storageGroup StorageGroupName
- * @return The specific StorageGroupSchema
- * @throws StorageGroupNotExistsException When the specific StorageGroup
doesn't exist
+ * @return CountStorageGroupResp
*/
- public TStorageGroupSchema getStorageGroupSchemaByName(String storageGroup)
- throws StorageGroupNotExistsException {
- return clusterSchemaInfo.getMatchedStorageGroupSchemaByName(storageGroup);
+ public DataSet countMatchedStorageGroups(CountStorageGroupReq
countStorageGroupReq) {
+ return getConsensusManager().read(countStorageGroupReq).getDataset();
}
/**
- * Only leader use this interface.
+ * Get StorageGroupSchemas by specific path pattern
*
- * @param rawPathList List<StorageGroupName>
- * @return the matched StorageGroupSchemas
+ * @return StorageGroupSchemaDataSet
*/
- public Map<String, TStorageGroupSchema> getMatchedStorageGroupSchemasByName(
- List<String> rawPathList) {
- return clusterSchemaInfo.getMatchedStorageGroupSchemasByName(rawPathList);
+ public DataSet getMatchedStorageGroupSchema(GetStorageGroupReq
getStorageGroupReq) {
+ return getConsensusManager().read(getStorageGroupReq).getDataset();
}
public TSStatus setTTL(SetTTLReq setTTLReq) {
@@ -133,31 +140,113 @@ public class ClusterSchemaManager {
}
/**
- * Count StorageGroups by specific path pattern
+ * Only leader use this interface. Adjust the maxSchemaRegionGroupCount and
+ * maxDataRegionGroupCount of each StorageGroup bases on existing cluster
resources
+ */
+ public synchronized void adjustMaxRegionGroupCount() {
+ // Get all StorageGroupSchemas
+ Map<String, TStorageGroupSchema> storageGroupSchemaMap =
+ getMatchedStorageGroupSchemasByName(getStorageGroupNames());
+ int dataNodeNum = getNodeManager().getOnlineDataNodeCount();
+ int totalCpuCoreNum = getNodeManager().getTotalCpuCoreCount();
+ int storageGroupNum = storageGroupSchemaMap.size();
+
+ AdjustMaxRegionGroupCountReq adjustMaxRegionGroupCountReq = new
AdjustMaxRegionGroupCountReq();
+ for (TStorageGroupSchema storageGroupSchema :
storageGroupSchemaMap.values()) {
+ try {
+ // Adjust maxSchemaRegionGroupCount.
+ // All StorageGroups share the DataNodes equally.
+ // Allocated SchemaRegionGroups are not shrunk.
+ int allocatedSchemaRegionGroupCount =
+ getPartitionManager()
+ .getRegionCount(storageGroupSchema.getName(),
TConsensusGroupType.SchemaRegion);
+ int maxSchemaRegionGroupCount =
+ Math.max(
+ 1,
+ Math.max(
+ dataNodeNum
+ / (storageGroupNum *
storageGroupSchema.getSchemaReplicationFactor()),
+ allocatedSchemaRegionGroupCount));
+
+ // Adjust maxDataRegionGroupCount.
+ // All StorageGroups divide one-third of the total cpu cores equally.
+ // Allocated DataRegionGroups are not shrunk.
+ int allocatedDataRegionGroupCount =
+ getPartitionManager()
+ .getRegionCount(storageGroupSchema.getName(),
TConsensusGroupType.DataRegion);
+ int maxDataRegionGroupCount =
+ Math.max(
+ 2,
+ Math.max(
+ totalCpuCoreNum
+ / (3 * storageGroupNum *
storageGroupSchema.getDataReplicationFactor()),
+ allocatedDataRegionGroupCount));
+
+ adjustMaxRegionGroupCountReq.putEntry(
+ storageGroupSchema.getName(),
+ new Pair<>(maxSchemaRegionGroupCount, maxDataRegionGroupCount));
+ } catch (StorageGroupNotExistsException e) {
+ LOGGER.warn("Adjust maxRegionGroupCount failed because StorageGroup
doesn't exist", e);
+ }
+ }
+ getConsensusManager().write(adjustMaxRegionGroupCountReq);
+ }
+
+ // ======================================================
+ // Leader scheduling interfaces
+ // ======================================================
+
+ /**
+ * Only leader use this interface.
*
- * @return CountStorageGroupResp
+ * @param storageGroup StorageGroupName
+ * @return The specific StorageGroupSchema
+ * @throws StorageGroupNotExistsException When the specific StorageGroup
doesn't exist
*/
- public CountStorageGroupResp countMatchedStorageGroups(
- CountStorageGroupReq countStorageGroupReq) {
- ConsensusReadResponse readResponse =
getConsensusManager().read(countStorageGroupReq);
- return (CountStorageGroupResp) readResponse.getDataset();
+ public TStorageGroupSchema getStorageGroupSchemaByName(String storageGroup)
+ throws StorageGroupNotExistsException {
+ return clusterSchemaInfo.getMatchedStorageGroupSchemaByName(storageGroup);
}
/**
- * Get StorageGroupSchemas by specific path pattern
+ * Only leader use this interface.
*
- * @return StorageGroupSchemaDataSet
+ * @param rawPathList List<StorageGroupName>
+ * @return the matched StorageGroupSchemas
*/
- public StorageGroupSchemaResp getMatchedStorageGroupSchema(
- GetStorageGroupReq getStorageGroupReq) {
- ConsensusReadResponse readResponse =
getConsensusManager().read(getStorageGroupReq);
- return (StorageGroupSchemaResp) readResponse.getDataset();
+ public Map<String, TStorageGroupSchema> getMatchedStorageGroupSchemasByName(
+ List<String> rawPathList) {
+ return clusterSchemaInfo.getMatchedStorageGroupSchemasByName(rawPathList);
}
+ /**
+ * Only leader use this interface.
+ *
+ * @return List<StorageGroupName>, all storageGroups' name
+ */
public List<String> getStorageGroupNames() {
return clusterSchemaInfo.getStorageGroupNames();
}
+ /**
+ * Only leader use this interface. Get the maxRegionGroupCount of specific
StorageGroup.
+ *
+ * @param storageGroup StorageGroupName
+ * @param consensusGroupType SchemaRegion or DataRegion
+ * @return maxSchemaRegionGroupCount or maxDataRegionGroupCount
+ */
+ public int getMaxRegionGroupCount(String storageGroup, TConsensusGroupType
consensusGroupType) {
+ return clusterSchemaInfo.getMaxRegionGroupCount(storageGroup,
consensusGroupType);
+ }
+
+ private NodeManager getNodeManager() {
+ return configManager.getNodeManager();
+ }
+
+ private PartitionManager getPartitionManager() {
+ return configManager.getPartitionManager();
+ }
+
private ConsensusManager getConsensusManager() {
return configManager.getConsensusManager();
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
index 7b40b7fe52..6500c6bb05 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
@@ -111,6 +111,9 @@ public class NodeManager {
req.getInfo().getLocation().setDataNodeId(nodeInfo.generateNextNodeId());
ConsensusWriteResponse resp = getConsensusManager().write(req);
dataSet.setStatus(resp.getStatus());
+
+ // Adjust the maximum RegionGroup number of each StorageGroup
+ getClusterSchemaManager().adjustMaxRegionGroupCount();
}
dataSet.setDataNodeId(req.getInfo().getLocation().getDataNodeId());
@@ -266,6 +269,10 @@ public class NodeManager {
return configManager.getConsensusManager();
}
+ private ClusterSchemaManager getClusterSchemaManager() {
+ return configManager.getClusterSchemaManager();
+ }
+
public void registerListener(final ChangeServerListener serverListener) {
listeners.add(serverListener);
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
index ac1ae22eea..5b82738703 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
@@ -59,6 +59,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -146,26 +147,11 @@ public class PartitionManager {
// Otherwise, fist ensure that each StorageGroup has at least one
SchemaRegion.
// This block of code is still parallel and concurrent safe.
// Thus, we can prepare the SchemaRegions with maximum efficiency.
- try {
- checkAndAllocateRegionsIfNecessary(
- new ArrayList<>(req.getPartitionSlotsMap().keySet()),
TConsensusGroupType.SchemaRegion);
- } catch (NotEnoughDataNodeException e) {
- resp.setStatus(
- new TSStatus(TSStatusCode.NOT_ENOUGH_DATA_NODE.getStatusCode())
- .setMessage(
- "ConfigNode failed to allocate DataPartition because there
are not enough DataNodes"));
- return resp;
- } catch (TimeoutException e) {
- resp.setStatus(
- new TSStatus(TSStatusCode.TIME_OUT.getStatusCode())
- .setMessage(
- "ConfigNode failed to allocate DataPartition because waiting
for another thread's Region allocation timeout."));
- return resp;
- } catch (StorageGroupNotExistsException e) {
- resp.setStatus(
- new TSStatus(TSStatusCode.STORAGE_GROUP_NOT_EXIST.getStatusCode())
- .setMessage(
- "ConfigNode failed to allocate DataPartition because some
StorageGroup doesn't exist."));
+ TSStatus status =
+ initializeRegionsIfNecessary(
+ new ArrayList<>(req.getPartitionSlotsMap().keySet()),
TConsensusGroupType.SchemaRegion);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ resp.setStatus(status);
return resp;
}
@@ -189,7 +175,9 @@ public class PartitionManager {
}
}
- // TODO: Allocate more SchemaRegions if necessary
+ // Finally, if some StorageGroups own too many slots, extend SchemaRegion
for them.
+ extendRegionsIfNecessary(
+ new ArrayList<>(req.getPartitionSlotsMap().keySet()),
TConsensusGroupType.SchemaRegion);
return getSchemaPartition(req);
}
@@ -215,26 +203,11 @@ public class PartitionManager {
// Otherwise, fist ensure that each StorageGroup has at least one
DataRegion.
// This block of code is still parallel and concurrent safe.
// Thus, we can prepare the DataRegions with maximum efficiency.
- try {
- checkAndAllocateRegionsIfNecessary(
- new ArrayList<>(req.getPartitionSlotsMap().keySet()),
TConsensusGroupType.DataRegion);
- } catch (NotEnoughDataNodeException e) {
- resp.setStatus(
- new TSStatus(TSStatusCode.NOT_ENOUGH_DATA_NODE.getStatusCode())
- .setMessage(
- "ConfigNode failed to allocate DataPartition because there
are not enough DataNodes"));
- return resp;
- } catch (TimeoutException e) {
- resp.setStatus(
- new TSStatus(TSStatusCode.TIME_OUT.getStatusCode())
- .setMessage(
- "ConfigNode failed to allocate DataPartition because waiting
for another thread's Region allocation timeout."));
- return resp;
- } catch (StorageGroupNotExistsException e) {
- resp.setStatus(
- new TSStatus(TSStatusCode.STORAGE_GROUP_NOT_EXIST.getStatusCode())
- .setMessage(
- "ConfigNode failed to allocate DataPartition because some
StorageGroup doesn't exist."));
+ TSStatus status =
+ initializeRegionsIfNecessary(
+ new ArrayList<>(req.getPartitionSlotsMap().keySet()),
TConsensusGroupType.DataRegion);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ resp.setStatus(status);
return resp;
}
@@ -259,14 +232,40 @@ public class PartitionManager {
}
}
- // TODO: Allocate more Regions if necessary
+ // Finally, if some StorageGroups own too many slots, extend DataRegion
for them.
+ extendRegionsIfNecessary(
+ new ArrayList<>(req.getPartitionSlotsMap().keySet()),
TConsensusGroupType.DataRegion);
return getDataPartition(req);
}
+ // ======================================================
+ // Leader scheduling interfaces
+ // ======================================================
+
+ /** Handle the exceptions from initializeRegions */
+ private TSStatus initializeRegionsIfNecessary(
+ List<String> storageGroups, TConsensusGroupType consensusGroupType) {
+ try {
+ initializeRegions(storageGroups, consensusGroupType);
+ } catch (NotEnoughDataNodeException e) {
+ return new TSStatus(TSStatusCode.NOT_ENOUGH_DATA_NODE.getStatusCode())
+ .setMessage(
+ "ConfigNode failed to allocate Partition because there are not
enough DataNodes");
+ } catch (TimeoutException e) {
+ return new TSStatus(TSStatusCode.TIME_OUT.getStatusCode())
+ .setMessage(
+ "ConfigNode failed to allocate Partition because waiting for
another thread's Region allocation timeout.");
+ } catch (StorageGroupNotExistsException e) {
+ return new TSStatus(TSStatusCode.STORAGE_GROUP_NOT_EXIST.getStatusCode())
+ .setMessage(
+ "ConfigNode failed to allocate DataPartition because some
StorageGroup doesn't exist.");
+ }
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ }
+
/**
- * Check whether each StorageGroup already has a Region of the specified
type, and allocate one
- * Region for each StorageGroup who doesn't have any.
+ * Initialize one Region for each StorageGroup who doesn't have any.
*
* @param storageGroups List<StorageGroupName>
* @param consensusGroupType SchemaRegion or DataRegion
@@ -275,12 +274,11 @@ public class PartitionManager {
* @throws TimeoutException When waiting other threads to allocate Regions
for too long
* @throws StorageGroupNotExistsException When some StorageGroups don't exist
*/
- private void checkAndAllocateRegionsIfNecessary(
- List<String> storageGroups, TConsensusGroupType consensusGroupType)
+ private void initializeRegions(List<String> storageGroups,
TConsensusGroupType consensusGroupType)
throws NotEnoughDataNodeException, TimeoutException,
StorageGroupNotExistsException {
int leastDataNode = 0;
- List<String> unreadyStorageGroups = new ArrayList<>();
+ Map<String, Integer> unreadyStorageGroupMap = new HashMap<>();
for (String storageGroup : storageGroups) {
if (getRegionCount(storageGroup, consensusGroupType) == 0) {
// Update leastDataNode
@@ -296,7 +294,7 @@ public class PartitionManager {
}
// Recording StorageGroups without Region
- unreadyStorageGroups.add(storageGroup);
+ unreadyStorageGroupMap.put(storageGroup, 1);
}
}
if (getNodeManager().getOnlineDataNodeCount() < leastDataNode) {
@@ -304,46 +302,110 @@ public class PartitionManager {
throw new NotEnoughDataNodeException();
}
- List<String> storageGroupsNeedAllocation = new ArrayList<>();
- List<String> storageGroupsInAllocation = new ArrayList<>();
- for (String storageGroup : unreadyStorageGroups) {
+ doOrWaitRegionCreation(unreadyStorageGroupMap, consensusGroupType);
+ }
+
+ /** Handle the exceptions from extendRegions */
+ private void extendRegionsIfNecessary(
+ List<String> storageGroups, TConsensusGroupType consensusGroupType) {
+ try {
+ extendRegions(storageGroups, consensusGroupType);
+ } catch (NotEnoughDataNodeException e) {
+ LOGGER.error("ConfigNode failed to extend Region because there are not
enough DataNodes");
+ } catch (TimeoutException e) {
+ LOGGER.error(
+ "ConfigNode failed to extend Region because waiting for another
thread's Region allocation timeout.");
+ } catch (StorageGroupNotExistsException e) {
+ LOGGER.error("ConfigNode failed to extend Region because some
StorageGroup doesn't exist.");
+ }
+ }
+
+ /**
+ * Allocate more Regions to StorageGroups who have too many slots.
+ *
+ * @param storageGroups List<StorageGroupName>
+ * @param consensusGroupType SchemaRegion or DataRegion
+ * @throws StorageGroupNotExistsException When some StorageGroups don't exist
+ * @throws NotEnoughDataNodeException When the number of online DataNodes
are too small to
+ * allocate Regions
+ * @throws TimeoutException When waiting other threads to allocate Regions
for too long
+ */
+ private void extendRegions(List<String> storageGroups, TConsensusGroupType
consensusGroupType)
+ throws StorageGroupNotExistsException, NotEnoughDataNodeException,
TimeoutException {
+ // Map<StorageGroup, Region allotment>
+ Map<String, Integer> filledStorageGroupMap = new HashMap<>();
+ for (String storageGroup : storageGroups) {
+ float regionCount = partitionInfo.getRegionCount(storageGroup,
consensusGroupType);
+ float slotCount = partitionInfo.getSlotCount(storageGroup);
+ float maxRegionCount =
+ getClusterSchemaManager().getMaxRegionGroupCount(storageGroup,
consensusGroupType);
+ float maxSlotCount =
ConfigNodeDescriptor.getInstance().getConf().getSeriesPartitionSlotNum();
+
+ // Need extension
+ if (regionCount < maxRegionCount && slotCount / regionCount >
maxSlotCount / maxRegionCount) {
+ // The delta is equal to the smallest integer solution that satisfies
the inequality:
+ // slotCount / (regionCount + delta) < maxSlotCount / maxRegionCount
+ int delta =
+ Math.min(
+ (int) (maxRegionCount - regionCount),
+ Math.max(
+ 1, (int) Math.ceil(slotCount * maxRegionCount /
maxSlotCount - regionCount)));
+ filledStorageGroupMap.put(storageGroup, delta);
+ }
+ }
+
+ doOrWaitRegionCreation(filledStorageGroupMap, consensusGroupType);
+ }
+
+ /**
+ * Do Region creation for those StorageGroups who get the allocation
particle, for those who
+ * doesn't, waiting until other threads finished the creation process.
+ *
+ * @param allotmentMap Map<StorageGroup, Region allotment>
+ * @param consensusGroupType SchemaRegion or DataRegion
+ * @throws NotEnoughDataNodeException When the number of online DataNodes
are too small to *
+ * allocate Regions
+ * @throws StorageGroupNotExistsException When some StorageGroups don't exist
+ * @throws TimeoutException When waiting other threads to allocate Regions
for too long
+ */
+ private void doOrWaitRegionCreation(
+ Map<String, Integer> allotmentMap, TConsensusGroupType
consensusGroupType)
+ throws NotEnoughDataNodeException, StorageGroupNotExistsException,
TimeoutException {
+ // StorageGroups who get the allocation particle
+ Map<String, Integer> allocateMap = new HashMap<>();
+ // StorageGroups who doesn't get the allocation particle
+ List<String> waitingList = new ArrayList<>();
+ for (String storageGroup : allotmentMap.keySet()) {
// Try to get the allocation particle
- if (partitionInfo.getRegionAllocationParticle(storageGroup,
consensusGroupType)) {
- storageGroupsNeedAllocation.add(storageGroup);
+ if (partitionInfo.contendRegionAllocationParticle(storageGroup,
consensusGroupType)) {
+ // Initialize one Region
+ allocateMap.put(storageGroup, allotmentMap.get(storageGroup));
} else {
- storageGroupsInAllocation.add(storageGroup);
+ waitingList.add(storageGroup);
}
}
- // Calculate the number of Regions
- // TODO: This is a temporary code, delete it later
- int regionNum;
- if (consensusGroupType == TConsensusGroupType.SchemaRegion) {
- regionNum =
- Math.max(
- 1,
- getNodeManager().getOnlineDataNodeCount()
- /
ConfigNodeDescriptor.getInstance().getConf().getSchemaReplicationFactor());
- } else {
- regionNum =
- Math.max(
- 2,
- (int)
- (getNodeManager().getTotalCpuCoreCount()
- * 0.3
- /
ConfigNodeDescriptor.getInstance().getConf().getDataReplicationFactor()));
+ // TODO: Use procedure to protect the following process
+ // Do Region allocation and creation for those StorageGroups who get the
particle
+ getLoadManager().doRegionCreation(allocateMap, consensusGroupType);
+ // Put back particles after that
+ for (String storageGroup : allocateMap.keySet()) {
+ partitionInfo.putBackRegionAllocationParticle(storageGroup,
consensusGroupType);
}
- // Do Region allocation for those StorageGroups who get the particle
- // TODO: Use Procedure to put back Region allocation particles when
creation process failed.
- getLoadManager().initializeRegions(storageGroupsNeedAllocation,
consensusGroupType, regionNum);
- // TODO: Put back particles after creation
+ // Waiting Region creation for those StorageGroups who don't get the
particle
+ waitRegionCreation(waitingList, consensusGroupType);
+ }
- // Waiting allocation for those StorageGroups who don't get the particle
+ /** Waiting Region creation for those StorageGroups who don't get the
particle */
+ private void waitRegionCreation(List<String> waitingList,
TConsensusGroupType consensusGroupType)
+ throws TimeoutException {
for (int retry = 0; retry < 100; retry++) {
boolean allocationFinished = true;
- for (String storageGroup : storageGroupsInAllocation) {
- if (getRegionCount(storageGroup, consensusGroupType) == 0) {
+ for (String storageGroup : waitingList) {
+ if (!partitionInfo.getRegionAllocationParticle(storageGroup,
consensusGroupType)) {
+ // If a StorageGroup's Region allocation particle doesn't return,
+ // the Region creation process is not complete
allocationFinished = false;
break;
}
@@ -362,10 +424,6 @@ public class PartitionManager {
throw new TimeoutException("");
}
- // ======================================================
- // Leader scheduling interfaces
- // ======================================================
-
/**
* Only leader use this interface
*
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index c173cc1760..5be3588e0b 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -97,32 +97,28 @@ public class LoadManager {
}
/**
- * Allocate and create one Region for each StorageGroup. TODO: Use procedure
to protect create
- * Regions process
+ * Allocate and create Regions for each StorageGroup.
*
- * @param storageGroups List<StorageGroupName>
+ * @param allotmentMap Map<StorageGroupName, Region allotment>
* @param consensusGroupType TConsensusGroupType of Region to be allocated
- * @param regionNum The number of Regions
*/
- public void initializeRegions(
- List<String> storageGroups, TConsensusGroupType consensusGroupType, int
regionNum)
+ public void doRegionCreation(
+ Map<String, Integer> allotmentMap, TConsensusGroupType
consensusGroupType)
throws NotEnoughDataNodeException, StorageGroupNotExistsException {
- CreateRegionsReq createRegionsReq =
- regionBalancer.genRegionsAllocationPlan(storageGroups,
consensusGroupType, regionNum);
- createRegionsOnDataNodes(createRegionsReq);
+ CreateRegionsReq createRegionGroupsReq =
+ regionBalancer.genRegionsAllocationPlan(allotmentMap,
consensusGroupType);
- getConsensusManager().write(createRegionsReq);
- }
-
- private void createRegionsOnDataNodes(CreateRegionsReq createRegionsReq)
- throws StorageGroupNotExistsException {
+ // TODO: Use procedure to protect the following process
+ // Create Regions on DataNodes
Map<String, Long> ttlMap = new HashMap<>();
- for (String storageGroup : createRegionsReq.getRegionMap().keySet()) {
+ for (String storageGroup :
createRegionGroupsReq.getRegionGroupMap().keySet()) {
ttlMap.put(
storageGroup,
getClusterSchemaManager().getStorageGroupSchemaByName(storageGroup).getTTL());
}
- AsyncDataNodeClientPool.getInstance().createRegions(createRegionsReq,
ttlMap);
+ AsyncDataNodeClientPool.getInstance().createRegions(createRegionGroupsReq,
ttlMap);
+ // Persist the allocation result
+ getConsensusManager().write(createRegionGroupsReq);
}
/**
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
index 79e6656940..b4e7693721 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
@@ -34,6 +34,7 @@ import
org.apache.iotdb.confignode.manager.load.balancer.region.IRegionAllocator
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
import java.util.List;
+import java.util.Map;
/**
* The RegionBalancer provides interfaces to generate optimal Region
allocation and migration plans
@@ -49,23 +50,25 @@ public class RegionBalancer {
/**
* Generate a Regions allocation plan(CreateRegionsReq)
*
- * @param storageGroups List<StorageGroup>
+ * @param allotmentMap Map<StorageGroupName, Region allotment>
* @param consensusGroupType TConsensusGroupType of the new Regions
- * @param regionNum Number of Regions to be allocated per StorageGroup
* @return CreateRegionsReq
* @throws NotEnoughDataNodeException When the number of DataNodes is not
enough for allocation
* @throws StorageGroupNotExistsException When some StorageGroups don't exist
*/
public CreateRegionsReq genRegionsAllocationPlan(
- List<String> storageGroups, TConsensusGroupType consensusGroupType, int
regionNum)
+ Map<String, Integer> allotmentMap, TConsensusGroupType
consensusGroupType)
throws NotEnoughDataNodeException, StorageGroupNotExistsException {
- CreateRegionsReq createRegionsReq = new CreateRegionsReq();
+ CreateRegionsReq createRegionGroupsReq = new CreateRegionsReq();
IRegionAllocator regionAllocator = genRegionAllocator();
List<TDataNodeInfo> onlineDataNodes =
getNodeManager().getOnlineDataNodes(-1);
List<TRegionReplicaSet> allocatedRegions =
getPartitionManager().getAllReplicaSets();
- for (String storageGroup : storageGroups) {
+ for (Map.Entry<String, Integer> entry : allotmentMap.entrySet()) {
+ String storageGroup = entry.getKey();
+ int allotment = entry.getValue();
+
// Get schema
TStorageGroupSchema storageGroupSchema =
getClusterSchemaManager().getStorageGroupSchemaByName(storageGroup);
@@ -79,7 +82,7 @@ public class RegionBalancer {
throw new NotEnoughDataNodeException();
}
- for (int i = 0; i < regionNum; i++) {
+ for (int i = 0; i < allotment; i++) {
// Generate allocation plan
TRegionReplicaSet newRegion =
regionAllocator.allocateRegion(
@@ -88,13 +91,13 @@ public class RegionBalancer {
replicationFactor,
new TConsensusGroupId(
consensusGroupType,
getPartitionManager().generateNextRegionGroupId()));
- createRegionsReq.addRegion(storageGroup, newRegion);
+ createRegionGroupsReq.addRegionGroup(storageGroup, newRegion);
allocatedRegions.add(newRegion);
}
}
- return createRegionsReq;
+ return createRegionGroupsReq;
}
private IRegionAllocator genRegionAllocator() {
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ClusterSchemaInfo.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ClusterSchemaInfo.java
index e3bb52823b..bd292f1061 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ClusterSchemaInfo.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ClusterSchemaInfo.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.confignode.persistence;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
@@ -25,6 +26,7 @@ import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.consensus.request.read.CountStorageGroupReq;
import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupReq;
+import
org.apache.iotdb.confignode.consensus.request.write.AdjustMaxRegionGroupCountReq;
import
org.apache.iotdb.confignode.consensus.request.write.DeleteStorageGroupReq;
import
org.apache.iotdb.confignode.consensus.request.write.SetDataReplicationFactorReq;
import
org.apache.iotdb.confignode.consensus.request.write.SetSchemaReplicationFactorReq;
@@ -82,6 +84,10 @@ public class ClusterSchemaInfo implements SnapshotProcessor {
}
}
+ // ======================================================
+ // Consensus read/write interfaces
+ // ======================================================
+
/**
* Cache StorageGroupSchema
*
@@ -142,6 +148,51 @@ public class ClusterSchemaInfo implements
SnapshotProcessor {
return result;
}
+ /** @return The number of matched StorageGroups by the specific StorageGroup
pattern */
+ public CountStorageGroupResp countMatchedStorageGroups(CountStorageGroupReq
req) {
+ CountStorageGroupResp result = new CountStorageGroupResp();
+ storageGroupReadWriteLock.readLock().lock();
+ try {
+ PartialPath patternPath = new PartialPath(req.getStorageGroupPattern());
+ result.setCount(mTree.getBelongedStorageGroups(patternPath).size());
+ result.setStatus(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+ } catch (MetadataException e) {
+ LOGGER.error("Error StorageGroup name", e);
+ result.setStatus(
+ new TSStatus(TSStatusCode.STORAGE_GROUP_NOT_EXIST.getStatusCode())
+ .setMessage("Error StorageGroup name"));
+ } finally {
+ storageGroupReadWriteLock.readLock().unlock();
+ }
+ return result;
+ }
+
+ /** @return All StorageGroupSchemas that matches to the specific
StorageGroup pattern */
+ public StorageGroupSchemaResp
getMatchedStorageGroupSchemas(GetStorageGroupReq req) {
+ StorageGroupSchemaResp result = new StorageGroupSchemaResp();
+ storageGroupReadWriteLock.readLock().lock();
+ try {
+ Map<String, TStorageGroupSchema> schemaMap = new HashMap<>();
+ PartialPath patternPath = new PartialPath(req.getStorageGroupPattern());
+ List<PartialPath> matchedPaths =
mTree.getBelongedStorageGroups(patternPath);
+ for (PartialPath path : matchedPaths) {
+ schemaMap.put(
+ path.getFullPath(),
+
mTree.getStorageGroupNodeByStorageGroupPath(path).getStorageGroupSchema());
+ }
+ result.setSchemaMap(schemaMap);
+ result.setStatus(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+ } catch (MetadataException e) {
+ LOGGER.error("Error StorageGroup name", e);
+ result.setStatus(
+ new TSStatus(TSStatusCode.STORAGE_GROUP_NOT_EXIST.getStatusCode())
+ .setMessage("Error StorageGroup name"));
+ } finally {
+ storageGroupReadWriteLock.readLock().unlock();
+ }
+ return result;
+ }
+
public TSStatus setTTL(SetTTLReq req) {
TSStatus result = new TSStatus();
storageGroupReadWriteLock.writeLock().lock();
@@ -243,66 +294,63 @@ public class ClusterSchemaInfo implements
SnapshotProcessor {
return result;
}
- /** @return List<StorageGroupName>, all storageGroups' name */
- public List<String> getStorageGroupNames() {
- List<String> storageGroups = new ArrayList<>();
- storageGroupReadWriteLock.readLock().lock();
+ /**
+ * Adjust the maximum RegionGroup count of each StorageGroup
+ *
+ * @param req AdjustMaxRegionGroupCountReq
+ * @return SUCCESS_STATUS
+ */
+ public TSStatus adjustMaxRegionGroupCount(AdjustMaxRegionGroupCountReq req) {
+ TSStatus result = new TSStatus();
+ storageGroupReadWriteLock.writeLock().lock();
try {
- List<PartialPath> namePaths = mTree.getAllStorageGroupPaths();
- for (PartialPath path : namePaths) {
- storageGroups.add(path.getFullPath());
+ for (Map.Entry<String, Pair<Integer, Integer>> entry :
+ req.getMaxRegionGroupCountMap().entrySet()) {
+ PartialPath path = new PartialPath(entry.getKey());
+ TStorageGroupSchema storageGroupSchema =
+
mTree.getStorageGroupNodeByStorageGroupPath(path).getStorageGroupSchema();
+
storageGroupSchema.setMaxSchemaRegionGroupCount(entry.getValue().getLeft());
+
storageGroupSchema.setMaxDataRegionGroupCount(entry.getValue().getRight());
}
- } finally {
- storageGroupReadWriteLock.readLock().unlock();
- }
- return storageGroups;
- }
-
- /** @return The number of matched StorageGroups by the specific StorageGroup
pattern */
- public CountStorageGroupResp countMatchedStorageGroups(CountStorageGroupReq
req) {
- CountStorageGroupResp result = new CountStorageGroupResp();
- storageGroupReadWriteLock.readLock().lock();
- try {
- PartialPath patternPath = new PartialPath(req.getStorageGroupPattern());
- result.setCount(mTree.getBelongedStorageGroups(patternPath).size());
- result.setStatus(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+ result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (MetadataException e) {
LOGGER.error("Error StorageGroup name", e);
- result.setStatus(
- new TSStatus(TSStatusCode.STORAGE_GROUP_NOT_EXIST.getStatusCode())
- .setMessage("Error StorageGroup name"));
+ result.setCode(TSStatusCode.STORAGE_GROUP_NOT_EXIST.getStatusCode());
} finally {
- storageGroupReadWriteLock.readLock().unlock();
+ storageGroupReadWriteLock.writeLock().unlock();
}
return result;
}
- /** @return All StorageGroupSchemas that matches to the specific
StorageGroup pattern */
- public StorageGroupSchemaResp
getMatchedStorageGroupSchemas(GetStorageGroupReq req) {
- StorageGroupSchemaResp result = new StorageGroupSchemaResp();
+ // ======================================================
+ // Leader scheduling interfaces
+ // ======================================================
+
+ /**
+ * Only leader use this interface.
+ *
+ * @return List<StorageGroupName>, all storageGroups' name
+ */
+ public List<String> getStorageGroupNames() {
+ List<String> storageGroups = new ArrayList<>();
storageGroupReadWriteLock.readLock().lock();
try {
- Map<String, TStorageGroupSchema> schemaMap = new HashMap<>();
- PartialPath patternPath = new PartialPath(req.getStorageGroupPattern());
- List<PartialPath> matchedPaths =
mTree.getBelongedStorageGroups(patternPath);
- for (PartialPath path : matchedPaths) {
- schemaMap.put(
- path.getFullPath(),
-
mTree.getStorageGroupNodeByStorageGroupPath(path).getStorageGroupSchema());
+ List<PartialPath> namePaths = mTree.getAllStorageGroupPaths();
+ for (PartialPath path : namePaths) {
+ storageGroups.add(path.getFullPath());
}
- result.setSchemaMap(schemaMap);
- result.setStatus(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
- } catch (MetadataException e) {
- LOGGER.error("Error StorageGroup name", e);
- result.setStatus(
- new TSStatus(TSStatusCode.STORAGE_GROUP_NOT_EXIST.getStatusCode())
- .setMessage("Error StorageGroup name"));
} finally {
storageGroupReadWriteLock.readLock().unlock();
}
- return result;
+ return storageGroups;
}
+ /**
+ * Only leader use this interface. Check if the specific StorageGroup
already exists.
+ *
+ * @param storageName The specific StorageGroup's name
+ * @throws MetadataException If the specific StorageGroup already exists
+ */
public void checkContainsStorageGroup(String storageName) throws
MetadataException {
storageGroupReadWriteLock.readLock().lock();
try {
@@ -313,7 +361,7 @@ public class ClusterSchemaInfo implements SnapshotProcessor
{
}
/**
- * Get the specific StorageGroupSchema
+ * Only leader use this interface. Get the specific StorageGroupSchema
*
* @param storageGroup StorageGroupName
* @return The specific StorageGroupSchema
@@ -333,7 +381,12 @@ public class ClusterSchemaInfo implements
SnapshotProcessor {
}
}
- /** @return All StorageGroupSchemas that matches to the specific
StorageGroup patterns */
+ /**
+ * Only leader use this interface. Get the matched StorageGroupSchemas.
+ *
+ * @param rawPathList StorageGroups' path patterns or full paths
+ * @return All StorageGroupSchemas that matches to the specific StorageGroup
patterns
+ */
public Map<String, TStorageGroupSchema> getMatchedStorageGroupSchemasByName(
List<String> rawPathList) {
Map<String, TStorageGroupSchema> schemaMap = new HashMap<>();
@@ -355,6 +408,34 @@ public class ClusterSchemaInfo implements
SnapshotProcessor {
return schemaMap;
}
+ /**
+ * Only leader use this interface. Get the maxRegionGroupCount of specific
StorageGroup.
+ *
+ * @param storageGroup StorageGroupName
+ * @param consensusGroupType SchemaRegion or DataRegion
+ * @return maxSchemaRegionGroupCount or maxDataRegionGroupCount
+ */
+ public int getMaxRegionGroupCount(String storageGroup, TConsensusGroupType
consensusGroupType) {
+ storageGroupReadWriteLock.readLock().lock();
+ try {
+ PartialPath path = new PartialPath(storageGroup);
+ TStorageGroupSchema storageGroupSchema =
+
mTree.getStorageGroupNodeByStorageGroupPath(path).getStorageGroupSchema();
+ switch (consensusGroupType) {
+ case SchemaRegion:
+ return storageGroupSchema.getMaxSchemaRegionGroupCount();
+ case DataRegion:
+ default:
+ return storageGroupSchema.getMaxDataRegionGroupCount();
+ }
+ } catch (MetadataException e) {
+ LOGGER.warn("Error StorageGroup name", e);
+ return -1;
+ } finally {
+ storageGroupReadWriteLock.readLock().unlock();
+ }
+ }
+
@Override
public boolean processTakeSnapshot(File snapshotDir) throws IOException {
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
index e861e010d4..2ec583ce12 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
@@ -31,6 +31,7 @@ import
org.apache.iotdb.confignode.consensus.request.read.GetNodePathsPartitionR
import
org.apache.iotdb.confignode.consensus.request.read.GetRegionLocationsReq;
import
org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionReq;
import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupReq;
+import
org.apache.iotdb.confignode.consensus.request.write.AdjustMaxRegionGroupCountReq;
import org.apache.iotdb.confignode.consensus.request.write.ApplyConfigNodeReq;
import
org.apache.iotdb.confignode.consensus.request.write.CreateDataPartitionReq;
import org.apache.iotdb.confignode.consensus.request.write.CreateFunctionReq;
@@ -150,6 +151,8 @@ public class ConfigRequestExecutor {
return status;
}
return partitionInfo.setStorageGroup((SetStorageGroupReq) req);
+ case AdjustMaxRegionGroupCount:
+ return
clusterSchemaInfo.adjustMaxRegionGroupCount((AdjustMaxRegionGroupCountReq) req);
case DeleteStorageGroup:
partitionInfo.deleteStorageGroup((DeleteStorageGroupReq) req);
return clusterSchemaInfo.deleteStorageGroup((DeleteStorageGroupReq)
req);
@@ -163,8 +166,8 @@ public class ConfigRequestExecutor {
return
clusterSchemaInfo.setDataReplicationFactor((SetDataReplicationFactorReq) req);
case SetTimePartitionInterval:
return
clusterSchemaInfo.setTimePartitionInterval((SetTimePartitionIntervalReq) req);
- case CreateRegions:
- return partitionInfo.createRegions((CreateRegionsReq) req);
+ case CreateRegionGroups:
+ return partitionInfo.createRegionGroups((CreateRegionsReq) req);
case CreateSchemaPartition:
return partitionInfo.createSchemaPartition((CreateSchemaPartitionReq)
req);
case CreateDataPartition:
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index edb23f797d..552db2932f 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -163,26 +163,23 @@ public class PartitionInfo implements SnapshotProcessor {
String storageGroupName = req.getSchema().getName();
storageGroupPartitionTables.put(
storageGroupName, new StorageGroupPartitionTable(storageGroupName));
-
- LOGGER.info("Successfully set StorageGroup: {}", req.getSchema());
-
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
/**
- * Thread-safely cache allocation result of new Regions
+ * Thread-safely cache allocation result of new RegionGroups
*
- * @param req CreateRegionsReq
+ * @param req CreateRegionGroupsReq
* @return SUCCESS_STATUS
*/
- public TSStatus createRegions(CreateRegionsReq req) {
+ public TSStatus createRegionGroups(CreateRegionsReq req) {
TSStatus result;
AtomicInteger maxRegionId = new AtomicInteger(Integer.MIN_VALUE);
- req.getRegionMap()
+ req.getRegionGroupMap()
.forEach(
(storageGroup, regionReplicaSets) -> {
-
storageGroupPartitionTables.get(storageGroup).createRegions(regionReplicaSets);
+
storageGroupPartitionTables.get(storageGroup).createRegionGroups(regionReplicaSets);
regionReplicaSets.forEach(
regionReplicaSet ->
maxRegionId.set(
@@ -516,13 +513,37 @@ public class PartitionInfo implements SnapshotProcessor {
return storageGroupPartitionTables.get(storageGroup).getRegionCount(type);
}
+ public int getSlotCount(String storageGroup) {
+ return storageGroupPartitionTables.get(storageGroup).getSlotsCount();
+ }
+
/**
- * Only leader use this interface. Contending the Region allocation particle
+ * Only leader use this interface. Contending the Region allocation particle.
*
* @param storageGroup StorageGroupName
* @param type SchemaRegion or DataRegion
* @return True when successfully get the allocation particle, false
otherwise
*/
+ public boolean contendRegionAllocationParticle(String storageGroup,
TConsensusGroupType type) {
+ return
storageGroupPartitionTables.get(storageGroup).contendRegionAllocationParticle(type);
+ }
+
+ /**
+ * Only leader use this interface. Put back the Region allocation particle.
+ *
+ * @param storageGroup StorageGroupName
+ * @param type SchemaRegion or DataRegion
+ */
+ public void putBackRegionAllocationParticle(String storageGroup,
TConsensusGroupType type) {
+
storageGroupPartitionTables.get(storageGroup).putBackRegionAllocationParticle(type);
+ }
+
+ /**
+ * Only leader use this interface. Get the Region allocation particle.
+ *
+ * @param storageGroup StorageGroupName
+ * @param type SchemaRegion or DataRegion
+ */
public boolean getRegionAllocationParticle(String storageGroup,
TConsensusGroupType type) {
return
storageGroupPartitionTables.get(storageGroup).getRegionAllocationParticle(type);
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java
index cbe6a686f9..555a5195f8 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java
@@ -33,7 +33,6 @@ import java.util.concurrent.atomic.AtomicLong;
public class RegionGroup {
- private final TConsensusGroupId id;
private final TRegionReplicaSet replicaSet;
// For DataRegion, each SeriesSlot * TimeSlot form a slot,
@@ -41,19 +40,17 @@ public class RegionGroup {
private final AtomicLong slotCount;
public RegionGroup() {
- this.id = new TConsensusGroupId();
this.replicaSet = new TRegionReplicaSet();
this.slotCount = new AtomicLong();
}
public RegionGroup(TRegionReplicaSet replicaSet) {
- this.id = replicaSet.getRegionId();
this.replicaSet = replicaSet;
this.slotCount = new AtomicLong(0);
}
public TConsensusGroupId getId() {
- return id;
+ return replicaSet.getRegionId();
}
public TRegionReplicaSet getReplicaSet() {
@@ -70,14 +67,12 @@ public class RegionGroup {
public void serialize(OutputStream outputStream, TProtocol protocol)
throws IOException, TException {
- id.write(protocol);
replicaSet.write(protocol);
ReadWriteIOUtils.write(slotCount.get(), outputStream);
}
public void deserialize(InputStream inputStream, TProtocol protocol)
throws IOException, TException {
- id.read(protocol);
replicaSet.read(protocol);
slotCount.set(ReadWriteIOUtils.readLong(inputStream));
}
@@ -87,11 +82,11 @@ public class RegionGroup {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
RegionGroup that = (RegionGroup) o;
- return id.equals(that.id) && replicaSet.equals(that.replicaSet);
+ return replicaSet.equals(that.replicaSet);
}
@Override
public int hashCode() {
- return Objects.hash(id, replicaSet);
+ return Objects.hash(replicaSet);
}
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
index 1fd072b68e..cf082bc8ad 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
@@ -147,11 +147,11 @@ public class StorageGroupPartitionTable {
}
/**
- * Cache allocation result of new Regions
+ * Cache allocation result of new RegionGroups
*
* @param replicaSets List<TRegionReplicaSet>
*/
- public void createRegions(List<TRegionReplicaSet> replicaSets) {
+ public void createRegionGroups(List<TRegionReplicaSet> replicaSets) {
replicaSets.forEach(
replicaSet -> regionInfoMap.put(replicaSet.getRegionId(), new
RegionGroup(replicaSet)));
}
@@ -205,12 +205,12 @@ public class StorageGroupPartitionTable {
}
/**
- * Only leader use this interface. Contending the Region allocation particle
+ * Only leader use this interface. Contending the Region allocation particle.
*
* @param type SchemaRegion or DataRegion
* @return True when successfully get the allocation particle, false
otherwise
*/
- public boolean getRegionAllocationParticle(TConsensusGroupType type) {
+ public boolean contendRegionAllocationParticle(TConsensusGroupType type) {
switch (type) {
case SchemaRegion:
return schemaRegionParticle.getAndSet(false);
@@ -221,6 +221,40 @@ public class StorageGroupPartitionTable {
}
}
+ /**
+ * Only leader use this interface. Put back the Region allocation particle.
+ *
+ * @param type SchemaRegion or DataRegion
+ */
+ public void putBackRegionAllocationParticle(TConsensusGroupType type) {
+ switch (type) {
+ case SchemaRegion:
+ schemaRegionParticle.set(true);
+ case DataRegion:
+ dataRegionParticle.set(true);
+ }
+ }
+
+ /**
+ * Only leader use this interface. Get the Region allocation particle.
+ *
+ * @param type SchemaRegion or DataRegion
+ */
+ public boolean getRegionAllocationParticle(TConsensusGroupType type) {
+ switch (type) {
+ case SchemaRegion:
+ return schemaRegionParticle.get();
+ case DataRegion:
+ return dataRegionParticle.get();
+ default:
+ return false;
+ }
+ }
+
+ public int getSlotsCount() {
+ return seriesPartitionSlotsCount.get();
+ }
+
/**
* Thread-safely get SchemaPartition within the specific StorageGroup
*
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 33e724be70..dc00336ec5 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -191,9 +191,9 @@ public class ConfigNodeRPCServiceProcessor implements
ConfigIService.Iface {
ConfigNodeDescriptor.getInstance().getConf().getTimePartitionInterval());
}
- // Mark the StorageGroup as SchemaRegions and DataRegions not yet created
- storageGroupSchema.setMaximumSchemaRegionCount(0);
- storageGroupSchema.setMaximumDataRegionCount(0);
+ // Initialize the maxSchemaRegionGroupCount and maxDataRegionGroupCount as 0
+ storageGroupSchema.setMaxSchemaRegionGroupCount(0);
+ storageGroupSchema.setMaxDataRegionGroupCount(0);
SetStorageGroupReq setReq = new SetStorageGroupReq(storageGroupSchema);
TSStatus resp = configManager.setStorageGroup(setReq);
diff --git
a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestSerDeTest.java
b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestSerDeTest.java
index 1bea05b173..2fb73fa1b2 100644
---
a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestSerDeTest.java
+++
b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestSerDeTest.java
@@ -41,6 +41,7 @@ import
org.apache.iotdb.confignode.consensus.request.read.GetOrCreateSchemaParti
import
org.apache.iotdb.confignode.consensus.request.read.GetRegionLocationsReq;
import
org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionReq;
import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupReq;
+import
org.apache.iotdb.confignode.consensus.request.write.AdjustMaxRegionGroupCountReq;
import org.apache.iotdb.confignode.consensus.request.write.ApplyConfigNodeReq;
import
org.apache.iotdb.confignode.consensus.request.write.CreateDataPartitionReq;
import org.apache.iotdb.confignode.consensus.request.write.CreateRegionsReq;
@@ -59,6 +60,7 @@ import
org.apache.iotdb.confignode.consensus.request.write.UpdateProcedureReq;
import org.apache.iotdb.confignode.procedure.Procedure;
import org.apache.iotdb.confignode.procedure.impl.DeleteStorageGroupProcedure;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.junit.Assert;
import org.junit.Test;
@@ -158,6 +160,18 @@ public class ConfigRequestSerDeTest {
Assert.assertEquals(req0, req1);
}
+ @Test
+ public void AdjustMaxRegionGroupCountReqTest() throws IOException {
+ AdjustMaxRegionGroupCountReq req0 = new AdjustMaxRegionGroupCountReq();
+ for (int i = 0; i < 3; i++) {
+ req0.putEntry("root.sg" + i, new Pair<>(i, i));
+ }
+
+ AdjustMaxRegionGroupCountReq req1 =
+ (AdjustMaxRegionGroupCountReq)
ConfigRequest.Factory.create(req0.serializeToByteBuffer());
+ Assert.assertEquals(req0, req1);
+ }
+
@Test
public void CountStorageGroupReqTest() throws IOException {
CountStorageGroupReq req0 = new CountStorageGroupReq(Arrays.asList("root",
"sg"));
@@ -193,12 +207,12 @@ public class ConfigRequestSerDeTest {
TRegionReplicaSet dataRegionSet = new TRegionReplicaSet();
dataRegionSet.setRegionId(new
TConsensusGroupId(TConsensusGroupType.DataRegion, 0));
dataRegionSet.setDataNodeLocations(Collections.singletonList(dataNodeLocation));
- req0.addRegion("root.sg0", dataRegionSet);
+ req0.addRegionGroup("root.sg0", dataRegionSet);
TRegionReplicaSet schemaRegionSet = new TRegionReplicaSet();
schemaRegionSet.setRegionId(new
TConsensusGroupId(TConsensusGroupType.SchemaRegion, 1));
schemaRegionSet.setDataNodeLocations(Collections.singletonList(dataNodeLocation));
- req0.addRegion("root.sg1", schemaRegionSet);
+ req0.addRegionGroup("root.sg1", schemaRegionSet);
CreateRegionsReq req1 =
(CreateRegionsReq)
ConfigRequest.Factory.create(req0.serializeToByteBuffer());
diff --git
a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
index 1e24f8f1ed..756d2643f4 100644
---
a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
+++
b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
@@ -100,24 +100,24 @@ public class PartitionInfoTest {
partitionInfo.setStorageGroup(new SetStorageGroupReq(new
TStorageGroupSchema("root.test")));
// Create a SchemaRegion
- CreateRegionsReq createRegionsReq = new CreateRegionsReq();
+ CreateRegionsReq createRegionGroupsReq = new CreateRegionsReq();
TRegionReplicaSet schemaRegionReplicaSet =
generateTRegionReplicaSet(
testFlag.SchemaPartition.getFlag(),
generateTConsensusGroupId(
testFlag.SchemaPartition.getFlag(),
TConsensusGroupType.SchemaRegion));
- createRegionsReq.addRegion("root.test", schemaRegionReplicaSet);
- partitionInfo.createRegions(createRegionsReq);
+ createRegionGroupsReq.addRegionGroup("root.test", schemaRegionReplicaSet);
+ partitionInfo.createRegionGroups(createRegionGroupsReq);
// Create a DataRegion
- createRegionsReq = new CreateRegionsReq();
+ createRegionGroupsReq = new CreateRegionsReq();
TRegionReplicaSet dataRegionReplicaSet =
generateTRegionReplicaSet(
testFlag.DataPartition.getFlag(),
generateTConsensusGroupId(
testFlag.DataPartition.getFlag(),
TConsensusGroupType.DataRegion));
- createRegionsReq.addRegion("root.test", dataRegionReplicaSet);
- partitionInfo.createRegions(createRegionsReq);
+ createRegionGroupsReq.addRegionGroup("root.test", dataRegionReplicaSet);
+ partitionInfo.createRegionGroups(createRegionGroupsReq);
// Create a SchemaPartition
CreateSchemaPartitionReq createSchemaPartitionReq =
@@ -159,8 +159,8 @@ public class PartitionInfoTest {
testFlag.SchemaPartition.getFlag(),
generateTConsensusGroupId(
testFlag.SchemaPartition.getFlag(),
TConsensusGroupType.SchemaRegion));
- createRegionsReq.addRegion("root.test", schemaRegionReplicaSet);
- partitionInfo.createRegions(createRegionsReq);
+ createRegionsReq.addRegionGroup("root.test", schemaRegionReplicaSet);
+ partitionInfo.createRegionGroups(createRegionsReq);
// Create a DataRegion
createRegionsReq = new CreateRegionsReq();
@@ -169,8 +169,8 @@ public class PartitionInfoTest {
testFlag.DataPartition.getFlag(),
generateTConsensusGroupId(
testFlag.DataPartition.getFlag(),
TConsensusGroupType.DataRegion));
- createRegionsReq.addRegion("root.test", dataRegionReplicaSet);
- partitionInfo.createRegions(createRegionsReq);
+ createRegionsReq.addRegionGroup("root.test", dataRegionReplicaSet);
+ partitionInfo.createRegionGroups(createRegionsReq);
GetRegionLocationsReq regionReq = new GetRegionLocationsReq();
regionReq.setRegionType(null);
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift
b/thrift-confignode/src/main/thrift/confignode.thrift
index 2e539b6c4a..64163e83b8 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -106,8 +106,8 @@ struct TStorageGroupSchema {
3: optional i32 schemaReplicationFactor
4: optional i32 dataReplicationFactor
5: optional i64 timePartitionInterval
- 6: optional i32 maximumSchemaRegionCount
- 7: optional i32 maximumDataRegionCount
+ 6: optional i32 maxSchemaRegionGroupCount
+ 7: optional i32 maxDataRegionGroupCount
}
// Schema