This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 0133c139538 [IOTDB-6111] Fix Region creation bugs (#10844)
0133c139538 is described below

commit 0133c13953837f9acc5078b0486496429fe89bb1
Author: YongzaoDan <[email protected]>
AuthorDate: Mon Aug 14 23:10:36 2023 +0800

    [IOTDB-6111] Fix Region creation bugs (#10844)
---
 .../it/partition/IoTDBPartitionCreationIT.java     | 127 +++++++++++++--------
 .../manager/load/balancer/RegionBalancer.java      |  17 +--
 .../manager/load/cache/region/RegionCache.java     |   5 +-
 .../thrift/impl/DataNodeRegionManager.java         |  10 ++
 4 files changed, 100 insertions(+), 59 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionCreationIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionCreationIT.java
index ee2c8b62e83..b546c80dd30 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionCreationIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionCreationIT.java
@@ -115,7 +115,10 @@ public class IoTDBPartitionCreationIT {
   public void testPartitionAllocation() throws Exception {
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
-      // Status: Running, Running, Running, Region: [0], [0], [0]
+      // Current cluster: 1C3D
+      // Create 1 DataPartition to extend 1 DataRegionGroup
+      // DataNode status: Running, Running, Running
+      // Region distribution: [0], [0], [0]
       Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap =
           ConfigNodeTestUtils.constructPartitionSlotsMap(
               sg,
@@ -154,7 +157,10 @@ public class IoTDBPartitionCreationIT {
           testTimePartitionInterval,
           dataPartitionTableResp.getDataPartitionTable());
 
-      // Status: Running, Running, Removing, Region: [0], [0], [0]
+      // Current cluster: 1C3D
+      // Set 1 DataNode to Removing status
+      // DataNode status: Running, Running, Removing
+      // Region distribution: [0], [0], [0]
       TSetDataNodeStatusReq setDataNodeStatusReq = new TSetDataNodeStatusReq();
       DataNodeWrapper dataNodeWrapper = 
EnvFactory.getEnv().getDataNodeWrapper(2);
       setDataNodeStatusReq.setTargetDataNode(
@@ -184,7 +190,11 @@ public class IoTDBPartitionCreationIT {
         TimeUnit.SECONDS.sleep(1);
       }
 
-      // Status: Running, Running, Removing, Running, RegionGroup: [0, 1], [0, 
1], [0], [1]
+      // Register 1 DataNode and Create 1 DataPartition to extend 1 
DataRegionGroup
+      // The new DataRegions wouldn't be allocated to the Removing DataNode
+      // Current cluster: 1C4D
+      // DataNode status: Running, Running, Removing, Running
+      // Region distribution: [0, 1], [0, 1], [0], [1]
       EnvFactory.getEnv().registerNewDataNode(true);
       partitionSlotsMap =
           ConfigNodeTestUtils.constructPartitionSlotsMap(
@@ -223,8 +233,10 @@ public class IoTDBPartitionCreationIT {
           testTimePartitionInterval,
           dataPartitionTableResp.getDataPartitionTable());
 
-      // Status: Running, Running, Removing, ReadOnly, RegionGroup: [0, 1], 
[0, 1],
-      // [0], [1]
+      // Current cluster: 1C4D
+      // Set 1 DataNode to ReadOnly status
+      // DataNode status: Running, Running, Removing, ReadOnly
+      // Region distribution: [0, 1], [0, 1], [0], [1]
       setDataNodeStatusReq = new TSetDataNodeStatusReq();
       dataNodeWrapper = EnvFactory.getEnv().getDataNodeWrapper(3);
       setDataNodeStatusReq.setTargetDataNode(
@@ -254,8 +266,11 @@ public class IoTDBPartitionCreationIT {
         TimeUnit.SECONDS.sleep(1);
       }
 
-      // Status: Running, Running, Removing, ReadOnly, Running, RegionGroup: 
[0, 1, 2], [0, 1, 2],
-      // [0], [1], [2]
+      // Register 1 DataNode and Create 1 DataPartition to extend 1 
DataRegionGroup
+      // The new DataRegions wouldn't be allocated to the Removing and 
ReadOnly DataNode
+      // Current cluster: 1C5D
+      // DataNode status: Running, Running, Removing, ReadOnly, Running
+      // Region distribution: [0, 1, 2], [0, 1, 2], [0], [1], [2]
       EnvFactory.getEnv().registerNewDataNode(true);
       partitionSlotsMap =
           ConfigNodeTestUtils.constructPartitionSlotsMap(
@@ -294,8 +309,10 @@ public class IoTDBPartitionCreationIT {
           testTimePartitionInterval,
           dataPartitionTableResp.getDataPartitionTable());
 
-      // Status: Running, Running, Removing, ReadOnly, Unknown, 
RegionGroup:[0, 1, 2], [0, 1, 2],
-      // [0], [1], [2]
+      // Shutdown 1 DataNode
+      // Current cluster: 1C5D
+      // DataNode status: Running, Running, Removing, ReadOnly, Unknown
+      // Region distribution: [0, 1, 2], [0, 1, 2], [0], [1], [2]
       EnvFactory.getEnv().shutdownDataNode(4);
       // Wait for shutdown check
       while (true) {
@@ -316,10 +333,13 @@ public class IoTDBPartitionCreationIT {
         TimeUnit.SECONDS.sleep(1);
       }
 
-      // Status: Running, Running, Removing, ReadOnly, Unknown, Running,
-      // RegionGroup: [0, 1, 2, 3], [0, 1, 2, 3], [0], [1], [2], [3]
+      // Register 1 DataNode and Create 1 DataPartition to extend 1 
DataRegionGroup
+      // The new DataRegions wouldn't be allocated to the Removing and 
ReadOnly DataNode
+      // But the new DataRegion can be allocated to the Unknown DataNode
+      // Current cluster: 1C6D
+      // Status: Running, Running, Removing, ReadOnly, Unknown, Running
+      // RegionGroup: [0, 1, 2, 3], [0, 1, 2], [0], [1], [2, 3], [3]
       EnvFactory.getEnv().registerNewDataNode(false);
-
       // Use thread sleep to replace verifying because the Unknown DataNode 
can not pass the
       // connection check
       TimeUnit.SECONDS.sleep(25);
@@ -379,47 +399,60 @@ public class IoTDBPartitionCreationIT {
           readOnlyCnt += 1;
         }
       }
-      Assert.assertEquals(9, runningCnt);
+      Assert.assertEquals(8, runningCnt);
       Assert.assertEquals(1, removingCnt);
       Assert.assertEquals(1, readOnlyCnt);
-      Assert.assertEquals(1, unknownCnt);
+      Assert.assertEquals(2, unknownCnt);
 
-      partitionSlotsMap =
-          ConfigNodeTestUtils.constructPartitionSlotsMap(
-              sg,
-              4,
-              4 + testSeriesPartitionBatchSize,
-              4,
-              4 + testTimePartitionBatchSize,
-              testTimePartitionInterval);
-      dataPartitionReq = new TDataPartitionReq(partitionSlotsMap);
-      for (int retry = 0; retry < 5; retry++) {
-        // Build new Client since it's unstable in Win8 environment
-        try (SyncConfigNodeIServiceClient configNodeClient =
-            (SyncConfigNodeIServiceClient) 
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
-          dataPartitionTableResp = 
configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
-          if (dataPartitionTableResp != null) {
-            break;
+      // Restart 1 DataNode
+      // Current cluster: 1C6D
+      // Status: Running, Running, Removing, ReadOnly, Running, Running
+      // RegionGroup: [0, 1, 2, 3], [0, 1, 2], [0], [1], [2, 3], [3]
+      EnvFactory.getEnv().startDataNode(4);
+      // Wait for restart check
+      while (true) {
+        AtomicBoolean containUnknown = new AtomicBoolean(false);
+        TShowDataNodesResp showDataNodesResp = client.showDataNodes();
+        showDataNodesResp
+            .getDataNodesInfoList()
+            .forEach(
+                dataNodeInfo -> {
+                  if 
(NodeStatus.Unknown.getStatus().equals(dataNodeInfo.getStatus())) {
+                    containUnknown.set(true);
+                  }
+                });
+
+        if (!containUnknown.get()) {
+          break;
+        }
+        TimeUnit.SECONDS.sleep(1);
+      }
+      // Check Region count and status
+      for (int i = 0; i < 10; i++) {
+        runningCnt = 0;
+        unknownCnt = 0;
+        readOnlyCnt = 0;
+        removingCnt = 0;
+        showRegionResp = client.showRegion(new TShowRegionReq());
+        Assert.assertEquals(
+            TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
showRegionResp.getStatus().getCode());
+        for (TRegionInfo regionInfo : showRegionResp.getRegionInfoList()) {
+          if (RegionStatus.Running.getStatus().equals(regionInfo.getStatus())) 
{
+            runningCnt += 1;
+          } else if 
(RegionStatus.Unknown.getStatus().equals(regionInfo.getStatus())) {
+            unknownCnt += 1;
+          } else if 
(RegionStatus.Removing.getStatus().equals(regionInfo.getStatus())) {
+            removingCnt += 1;
+          } else if 
(RegionStatus.ReadOnly.getStatus().equals(regionInfo.getStatus())) {
+            readOnlyCnt += 1;
           }
-        } catch (Exception e) {
-          // Retry sometimes in order to avoid request timeout
-          LOGGER.error(e.getMessage());
-          TimeUnit.SECONDS.sleep(1);
         }
+        if (runningCnt == 10 && unknownCnt == 0 && readOnlyCnt == 1 && 
removingCnt == 1) {
+          return;
+        }
+        TimeUnit.SECONDS.sleep(1);
       }
-      Assert.assertNotNull(dataPartitionTableResp);
-      Assert.assertEquals(
-          TSStatusCode.SUCCESS_STATUS.getStatusCode(),
-          dataPartitionTableResp.getStatus().getCode());
-      Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable());
-      ConfigNodeTestUtils.checkDataPartitionTable(
-          sg,
-          4,
-          4 + testSeriesPartitionBatchSize,
-          4,
-          4 + testTimePartitionBatchSize,
-          testTimePartitionInterval,
-          dataPartitionTableResp.getDataPartitionTable());
+      Assert.fail("Region status is not correct after 10s of recovery");
     }
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
index 31cb976f3b1..c9965fe036c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
@@ -74,18 +74,15 @@ public class RegionBalancer {
       Map<String, Integer> allotmentMap, TConsensusGroupType 
consensusGroupType)
       throws NotEnoughDataNodeException, DatabaseNotExistsException {
 
-    // The new RegionGroups will occupy online DataNodes firstly
-    List<TDataNodeConfiguration> onlineDataNodes =
-        getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running);
     // Some new RegionGroups will have to occupy unknown DataNodes
     // if the number of online DataNodes is insufficient
     List<TDataNodeConfiguration> availableDataNodes =
         getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running, 
NodeStatus.Unknown);
 
     // Make sure the number of available DataNodes is enough for allocating 
new RegionGroups
-    for (String storageGroup : allotmentMap.keySet()) {
+    for (String database : allotmentMap.keySet()) {
       int replicationFactor =
-          getClusterSchemaManager().getReplicationFactor(storageGroup, 
consensusGroupType);
+          getClusterSchemaManager().getReplicationFactor(database, 
consensusGroupType);
       if (availableDataNodes.size() < replicationFactor) {
         throw new NotEnoughDataNodeException();
       }
@@ -97,18 +94,16 @@ public class RegionBalancer {
         getPartitionManager().getAllReplicaSets(consensusGroupType);
 
     for (Map.Entry<String, Integer> entry : allotmentMap.entrySet()) {
-      String storageGroup = entry.getKey();
+      String database = entry.getKey();
       int allotment = entry.getValue();
       int replicationFactor =
-          getClusterSchemaManager().getReplicationFactor(storageGroup, 
consensusGroupType);
-      List<TDataNodeConfiguration> targetDataNodes =
-          onlineDataNodes.size() >= replicationFactor ? onlineDataNodes : 
availableDataNodes;
+          getClusterSchemaManager().getReplicationFactor(database, 
consensusGroupType);
 
       for (int i = 0; i < allotment; i++) {
         // Prepare input data
         Map<Integer, TDataNodeConfiguration> availableDataNodeMap = new 
ConcurrentHashMap<>();
         Map<Integer, Double> freeDiskSpaceMap = new ConcurrentHashMap<>();
-        targetDataNodes.forEach(
+        availableDataNodes.forEach(
             dataNodeConfiguration -> {
               int dataNodeId = 
dataNodeConfiguration.getLocation().getDataNodeId();
               availableDataNodeMap.put(dataNodeId, dataNodeConfiguration);
@@ -124,7 +119,7 @@ public class RegionBalancer {
                 replicationFactor,
                 new TConsensusGroupId(
                     consensusGroupType, 
getPartitionManager().generateNextRegionGroupId()));
-        createRegionGroupsPlan.addRegionGroup(storageGroup, newRegionGroup);
+        createRegionGroupsPlan.addRegionGroup(database, newRegionGroup);
 
         // Mark the new RegionGroup as allocated
         allocatedRegionGroups.add(newRegionGroup);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
index a395c10a3e8..0bce99e0e89 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
@@ -59,7 +59,10 @@ public class RegionCache {
 
     // TODO: Optimize judge logic
     RegionStatus status;
-    if (System.currentTimeMillis() - lastSample.getSendTimestamp() > 
HEARTBEAT_TIMEOUT_TIME) {
+    if (RegionStatus.Removing.equals(lastSample.getStatus())) {
+      status = RegionStatus.Removing;
+    } else if (System.currentTimeMillis() - lastSample.getSendTimestamp()
+        > HEARTBEAT_TIMEOUT_TIME) {
       status = RegionStatus.Unknown;
     } else {
       status = lastSample.getStatus();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java
index b2a6cb4b94d..55c4e3bb87d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
+import 
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
 import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
 import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
 import org.apache.iotdb.db.exception.DataRegionException;
@@ -121,6 +122,11 @@ public class DataNodeRegionManager {
           SchemaRegionConsensusImpl.getInstance().createPeer(schemaRegionId, 
peers);
       if (consensusGenericResponse.isSuccess()) {
         tsStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+      } else if (consensusGenericResponse.getException()
+          instanceof ConsensusGroupAlreadyExistException) {
+        tsStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+        tsStatus.setMessage(
+            String.format("SchemaRegion %d already exists.", 
schemaRegionId.getId()));
       } else {
         tsStatus = new 
TSStatus(TSStatusCode.CREATE_REGION_ERROR.getStatusCode());
         
tsStatus.setMessage(consensusGenericResponse.getException().getMessage());
@@ -157,6 +163,10 @@ public class DataNodeRegionManager {
           DataRegionConsensusImpl.getInstance().createPeer(dataRegionId, 
peers);
       if (consensusGenericResponse.isSuccess()) {
         tsStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+      } else if (consensusGenericResponse.getException()
+          instanceof ConsensusGroupAlreadyExistException) {
+        tsStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+        tsStatus.setMessage(String.format("DataRegion %d already exists.", 
dataRegionId.getId()));
       } else {
         tsStatus = new 
TSStatus(TSStatusCode.CREATE_REGION_ERROR.getStatusCode());
         
tsStatus.setMessage(consensusGenericResponse.getException().getMessage());

Reply via email to