This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 0133c139538 [IOTDB-6111] Fix Region creation bugs (#10844)
0133c139538 is described below
commit 0133c13953837f9acc5078b0486496429fe89bb1
Author: YongzaoDan <[email protected]>
AuthorDate: Mon Aug 14 23:10:36 2023 +0800
[IOTDB-6111] Fix Region creation bugs (#10844)
---
.../it/partition/IoTDBPartitionCreationIT.java | 127 +++++++++++++--------
.../manager/load/balancer/RegionBalancer.java | 17 +--
.../manager/load/cache/region/RegionCache.java | 5 +-
.../thrift/impl/DataNodeRegionManager.java | 10 ++
4 files changed, 100 insertions(+), 59 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionCreationIT.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionCreationIT.java
index ee2c8b62e83..b546c80dd30 100644
---
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionCreationIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionCreationIT.java
@@ -115,7 +115,10 @@ public class IoTDBPartitionCreationIT {
public void testPartitionAllocation() throws Exception {
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
- // Status: Running, Running, Running, Region: [0], [0], [0]
+ // Current cluster: 1C3D
+ // Create 1 DataPartition to extend 1 DataRegionGroup
+ // DataNode status: Running, Running, Running
+ // Region distribution: [0], [0], [0]
Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap =
ConfigNodeTestUtils.constructPartitionSlotsMap(
sg,
@@ -154,7 +157,10 @@ public class IoTDBPartitionCreationIT {
testTimePartitionInterval,
dataPartitionTableResp.getDataPartitionTable());
- // Status: Running, Running, Removing, Region: [0], [0], [0]
+ // Current cluster: 1C3D
+ // Set 1 DataNode to Removing status
+ // DataNode status: Running, Running, Removing
+ // Region distribution: [0], [0], [0]
TSetDataNodeStatusReq setDataNodeStatusReq = new TSetDataNodeStatusReq();
DataNodeWrapper dataNodeWrapper =
EnvFactory.getEnv().getDataNodeWrapper(2);
setDataNodeStatusReq.setTargetDataNode(
@@ -184,7 +190,11 @@ public class IoTDBPartitionCreationIT {
TimeUnit.SECONDS.sleep(1);
}
- // Status: Running, Running, Removing, Running, RegionGroup: [0, 1], [0,
1], [0], [1]
+ // Register 1 DataNode and Create 1 DataPartition to extend 1
DataRegionGroup
+ // The new DataRegions wouldn't be allocated to the Removing DataNode
+ // Current cluster: 1C4D
+ // DataNode status: Running, Running, Removing, Running
+ // Region distribution: [0, 1], [0, 1], [0], [1]
EnvFactory.getEnv().registerNewDataNode(true);
partitionSlotsMap =
ConfigNodeTestUtils.constructPartitionSlotsMap(
@@ -223,8 +233,10 @@ public class IoTDBPartitionCreationIT {
testTimePartitionInterval,
dataPartitionTableResp.getDataPartitionTable());
- // Status: Running, Running, Removing, ReadOnly, RegionGroup: [0, 1],
[0, 1],
- // [0], [1]
+ // Current cluster: 1C4D
+ // Set 1 DataNode to ReadOnly status
+ // DataNode status: Running, Running, Removing, ReadOnly
+ // Region distribution: [0, 1], [0, 1], [0], [1]
setDataNodeStatusReq = new TSetDataNodeStatusReq();
dataNodeWrapper = EnvFactory.getEnv().getDataNodeWrapper(3);
setDataNodeStatusReq.setTargetDataNode(
@@ -254,8 +266,11 @@ public class IoTDBPartitionCreationIT {
TimeUnit.SECONDS.sleep(1);
}
- // Status: Running, Running, Removing, ReadOnly, Running, RegionGroup:
[0, 1, 2], [0, 1, 2],
- // [0], [1], [2]
+ // Register 1 DataNode and Create 1 DataPartition to extend 1
DataRegionGroup
+ // The new DataRegions wouldn't be allocated to the Removing and
ReadOnly DataNode
+ // Current cluster: 1C5D
+ // DataNode status: Running, Running, Removing, ReadOnly, Running
+ // Region distribution: [0, 1, 2], [0, 1, 2], [0], [1], [2]
EnvFactory.getEnv().registerNewDataNode(true);
partitionSlotsMap =
ConfigNodeTestUtils.constructPartitionSlotsMap(
@@ -294,8 +309,10 @@ public class IoTDBPartitionCreationIT {
testTimePartitionInterval,
dataPartitionTableResp.getDataPartitionTable());
- // Status: Running, Running, Removing, ReadOnly, Unknown,
RegionGroup:[0, 1, 2], [0, 1, 2],
- // [0], [1], [2]
+ // Shutdown 1 DataNode
+ // Current cluster: 1C5D
+ // DataNode status: Running, Running, Removing, ReadOnly, Unknown
+ // Region distribution: [0, 1, 2], [0, 1, 2], [0], [1], [2]
EnvFactory.getEnv().shutdownDataNode(4);
// Wait for shutdown check
while (true) {
@@ -316,10 +333,13 @@ public class IoTDBPartitionCreationIT {
TimeUnit.SECONDS.sleep(1);
}
- // Status: Running, Running, Removing, ReadOnly, Unknown, Running,
- // RegionGroup: [0, 1, 2, 3], [0, 1, 2, 3], [0], [1], [2], [3]
+ // Register 1 DataNode and Create 1 DataPartition to extend 1
DataRegionGroup
+ // The new DataRegions wouldn't be allocated to the Removing and
ReadOnly DataNode
+ // But the new DataRegion can be allocated to the Unknown DataNode
+ // Current cluster: 1C6D
+ // Status: Running, Running, Removing, ReadOnly, Unknown, Running
+ // RegionGroup: [0, 1, 2, 3], [0, 1, 2], [0], [1], [2, 3], [3]
EnvFactory.getEnv().registerNewDataNode(false);
-
// Use thread sleep to replace verifying because the Unknown DataNode
can not pass the
// connection check
TimeUnit.SECONDS.sleep(25);
@@ -379,47 +399,60 @@ public class IoTDBPartitionCreationIT {
readOnlyCnt += 1;
}
}
- Assert.assertEquals(9, runningCnt);
+ Assert.assertEquals(8, runningCnt);
Assert.assertEquals(1, removingCnt);
Assert.assertEquals(1, readOnlyCnt);
- Assert.assertEquals(1, unknownCnt);
+ Assert.assertEquals(2, unknownCnt);
- partitionSlotsMap =
- ConfigNodeTestUtils.constructPartitionSlotsMap(
- sg,
- 4,
- 4 + testSeriesPartitionBatchSize,
- 4,
- 4 + testTimePartitionBatchSize,
- testTimePartitionInterval);
- dataPartitionReq = new TDataPartitionReq(partitionSlotsMap);
- for (int retry = 0; retry < 5; retry++) {
- // Build new Client since it's unstable in Win8 environment
- try (SyncConfigNodeIServiceClient configNodeClient =
- (SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
- dataPartitionTableResp =
configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
- if (dataPartitionTableResp != null) {
- break;
+ // Restart 1 DataNode
+ // Current cluster: 1C6D
+ // Status: Running, Running, Removing, ReadOnly, Running, Running
+ // RegionGroup: [0, 1, 2, 3], [0, 1, 2], [0], [1], [2, 3], [3]
+ EnvFactory.getEnv().startDataNode(4);
+ // Wait for restart check
+ while (true) {
+ AtomicBoolean containUnknown = new AtomicBoolean(false);
+ TShowDataNodesResp showDataNodesResp = client.showDataNodes();
+ showDataNodesResp
+ .getDataNodesInfoList()
+ .forEach(
+ dataNodeInfo -> {
+ if
(NodeStatus.Unknown.getStatus().equals(dataNodeInfo.getStatus())) {
+ containUnknown.set(true);
+ }
+ });
+
+ if (!containUnknown.get()) {
+ break;
+ }
+ TimeUnit.SECONDS.sleep(1);
+ }
+ // Check Region count and status
+ for (int i = 0; i < 10; i++) {
+ runningCnt = 0;
+ unknownCnt = 0;
+ readOnlyCnt = 0;
+ removingCnt = 0;
+ showRegionResp = client.showRegion(new TShowRegionReq());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
showRegionResp.getStatus().getCode());
+ for (TRegionInfo regionInfo : showRegionResp.getRegionInfoList()) {
+ if (RegionStatus.Running.getStatus().equals(regionInfo.getStatus()))
{
+ runningCnt += 1;
+ } else if
(RegionStatus.Unknown.getStatus().equals(regionInfo.getStatus())) {
+ unknownCnt += 1;
+ } else if
(RegionStatus.Removing.getStatus().equals(regionInfo.getStatus())) {
+ removingCnt += 1;
+ } else if
(RegionStatus.ReadOnly.getStatus().equals(regionInfo.getStatus())) {
+ readOnlyCnt += 1;
}
- } catch (Exception e) {
- // Retry sometimes in order to avoid request timeout
- LOGGER.error(e.getMessage());
- TimeUnit.SECONDS.sleep(1);
}
+ if (runningCnt == 10 && unknownCnt == 0 && readOnlyCnt == 1 &&
removingCnt == 1) {
+ return;
+ }
+ TimeUnit.SECONDS.sleep(1);
}
- Assert.assertNotNull(dataPartitionTableResp);
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(),
- dataPartitionTableResp.getStatus().getCode());
- Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable());
- ConfigNodeTestUtils.checkDataPartitionTable(
- sg,
- 4,
- 4 + testSeriesPartitionBatchSize,
- 4,
- 4 + testTimePartitionBatchSize,
- testTimePartitionInterval,
- dataPartitionTableResp.getDataPartitionTable());
+ Assert.fail("Region status is not correct after 10s of recovery");
}
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
index 31cb976f3b1..c9965fe036c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
@@ -74,18 +74,15 @@ public class RegionBalancer {
Map<String, Integer> allotmentMap, TConsensusGroupType
consensusGroupType)
throws NotEnoughDataNodeException, DatabaseNotExistsException {
- // The new RegionGroups will occupy online DataNodes firstly
- List<TDataNodeConfiguration> onlineDataNodes =
- getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running);
// Some new RegionGroups will have to occupy unknown DataNodes
// if the number of online DataNodes is insufficient
List<TDataNodeConfiguration> availableDataNodes =
getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running,
NodeStatus.Unknown);
// Make sure the number of available DataNodes is enough for allocating
new RegionGroups
- for (String storageGroup : allotmentMap.keySet()) {
+ for (String database : allotmentMap.keySet()) {
int replicationFactor =
- getClusterSchemaManager().getReplicationFactor(storageGroup,
consensusGroupType);
+ getClusterSchemaManager().getReplicationFactor(database,
consensusGroupType);
if (availableDataNodes.size() < replicationFactor) {
throw new NotEnoughDataNodeException();
}
@@ -97,18 +94,16 @@ public class RegionBalancer {
getPartitionManager().getAllReplicaSets(consensusGroupType);
for (Map.Entry<String, Integer> entry : allotmentMap.entrySet()) {
- String storageGroup = entry.getKey();
+ String database = entry.getKey();
int allotment = entry.getValue();
int replicationFactor =
- getClusterSchemaManager().getReplicationFactor(storageGroup,
consensusGroupType);
- List<TDataNodeConfiguration> targetDataNodes =
- onlineDataNodes.size() >= replicationFactor ? onlineDataNodes :
availableDataNodes;
+ getClusterSchemaManager().getReplicationFactor(database,
consensusGroupType);
for (int i = 0; i < allotment; i++) {
// Prepare input data
Map<Integer, TDataNodeConfiguration> availableDataNodeMap = new
ConcurrentHashMap<>();
Map<Integer, Double> freeDiskSpaceMap = new ConcurrentHashMap<>();
- targetDataNodes.forEach(
+ availableDataNodes.forEach(
dataNodeConfiguration -> {
int dataNodeId =
dataNodeConfiguration.getLocation().getDataNodeId();
availableDataNodeMap.put(dataNodeId, dataNodeConfiguration);
@@ -124,7 +119,7 @@ public class RegionBalancer {
replicationFactor,
new TConsensusGroupId(
consensusGroupType,
getPartitionManager().generateNextRegionGroupId()));
- createRegionGroupsPlan.addRegionGroup(storageGroup, newRegionGroup);
+ createRegionGroupsPlan.addRegionGroup(database, newRegionGroup);
// Mark the new RegionGroup as allocated
allocatedRegionGroups.add(newRegionGroup);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
index a395c10a3e8..0bce99e0e89 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
@@ -59,7 +59,10 @@ public class RegionCache {
// TODO: Optimize judge logic
RegionStatus status;
- if (System.currentTimeMillis() - lastSample.getSendTimestamp() >
HEARTBEAT_TIMEOUT_TIME) {
+ if (RegionStatus.Removing.equals(lastSample.getStatus())) {
+ status = RegionStatus.Removing;
+ } else if (System.currentTimeMillis() - lastSample.getSendTimestamp()
+ > HEARTBEAT_TIMEOUT_TIME) {
status = RegionStatus.Unknown;
} else {
status = lastSample.getStatus();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java
index b2a6cb4b94d..55c4e3bb87d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
+import
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.exception.DataRegionException;
@@ -121,6 +122,11 @@ public class DataNodeRegionManager {
SchemaRegionConsensusImpl.getInstance().createPeer(schemaRegionId,
peers);
if (consensusGenericResponse.isSuccess()) {
tsStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ } else if (consensusGenericResponse.getException()
+ instanceof ConsensusGroupAlreadyExistException) {
+ tsStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ tsStatus.setMessage(
+ String.format("SchemaRegion %d already exists.",
schemaRegionId.getId()));
} else {
tsStatus = new
TSStatus(TSStatusCode.CREATE_REGION_ERROR.getStatusCode());
tsStatus.setMessage(consensusGenericResponse.getException().getMessage());
@@ -157,6 +163,10 @@ public class DataNodeRegionManager {
DataRegionConsensusImpl.getInstance().createPeer(dataRegionId,
peers);
if (consensusGenericResponse.isSuccess()) {
tsStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ } else if (consensusGenericResponse.getException()
+ instanceof ConsensusGroupAlreadyExistException) {
+ tsStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ tsStatus.setMessage(String.format("DataRegion %d already exists.",
dataRegionId.getId()));
} else {
tsStatus = new
TSStatus(TSStatusCode.CREATE_REGION_ERROR.getStatusCode());
tsStatus.setMessage(consensusGenericResponse.getException().getMessage());