This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d46ceb4a05 [IOTDB-6291] Update RegionGroup configuration when update 
DataNode configuration (#11914)
6d46ceb4a05 is described below

commit 6d46ceb4a053b12b6ebd01c11c654c2cf4f86c82
Author: Yongzao <[email protected]>
AuthorDate: Wed Jan 17 17:34:48 2024 +0800

    [IOTDB-6291] Update RegionGroup configuration when update DataNode 
configuration (#11914)
---
 .../it/cluster/IoTDBClusterRestartIT.java          | 70 ++++++++++++++++++++++
 .../persistence/executor/ConfigPlanExecutor.java   |  9 ++-
 .../partition/DatabasePartitionTable.java          | 10 ++++
 .../persistence/partition/PartitionInfo.java       | 19 ++++++
 .../persistence/partition/RegionGroup.java         | 15 +++++
 5 files changed, 121 insertions(+), 2 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java
index 8b22780f8d2..8e5e17b17bf 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java
@@ -19,11 +19,19 @@
 
 package org.apache.iotdb.confignode.it.cluster;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
 import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
+import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
+import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
 import org.apache.iotdb.it.env.EnvFactory;
 import org.apache.iotdb.it.env.cluster.EnvUtils;
 import org.apache.iotdb.it.env.cluster.config.MppBaseConfig;
@@ -33,9 +41,11 @@ import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
 import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.thrift.TException;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -46,7 +56,9 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.iotdb.consensus.ConsensusFactory.RATIS_CONSENSUS;
 
@@ -59,6 +71,8 @@ public class IoTDBClusterRestartIT {
 
   private static final int testConfigNodeNum = 3, testDataNodeNum = 2;
 
+  private static final long testTimePartitionInterval = 604800000;
+
   @Before
   public void setUp() {
     EnvFactory.getEnv()
@@ -114,6 +128,45 @@ public class IoTDBClusterRestartIT {
   @Test
   public void clusterRestartAfterUpdateDataNodeTest()
       throws InterruptedException, ClientManagerException, IOException, 
TException {
+    // Create default Database
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+      TSStatus status = client.setDatabase(new 
TDatabaseSchema("root.database"));
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
+    }
+    // Create some DataPartitions to extend 2 DataRegionGroups
+    Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap =
+        ConfigNodeTestUtils.constructPartitionSlotsMap(
+            "root.database", 0, 10, 0, 10, testTimePartitionInterval);
+    TDataPartitionReq dataPartitionReq = new 
TDataPartitionReq(partitionSlotsMap);
+    TDataPartitionTableResp dataPartitionTableResp = null;
+    for (int retry = 0; retry < 5; retry++) {
+      // Build new Client since it's unstable in Win8 environment
+      try (SyncConfigNodeIServiceClient configNodeClient =
+          (SyncConfigNodeIServiceClient) 
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+        dataPartitionTableResp = 
configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
+        if (dataPartitionTableResp != null) {
+          break;
+        }
+      } catch (Exception e) {
+        // Retry sometimes in order to avoid request timeout
+        logger.error(e.getMessage());
+        TimeUnit.SECONDS.sleep(1);
+      }
+    }
+    Assert.assertNotNull(dataPartitionTableResp);
+    Assert.assertEquals(
+        TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
dataPartitionTableResp.getStatus().getCode());
+    Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable());
+    ConfigNodeTestUtils.checkDataPartitionTable(
+        "root.database",
+        0,
+        10,
+        0,
+        10,
+        testTimePartitionInterval,
+        dataPartitionTableResp.getDataPartitionTable());
+
     // Shutdown all DataNodes
     for (int i = 0; i < testDataNodeNum; i++) {
       EnvFactory.getEnv().shutdownDataNode(i);
@@ -151,12 +204,29 @@ public class IoTDBClusterRestartIT {
     // Check DataNode EndPoint
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+      // Check update in NodeInfo
       TShowClusterResp showClusterResp = client.showCluster();
       ConfigNodeTestUtils.checkNodeConfig(
           showClusterResp.getConfigNodeList(),
           showClusterResp.getDataNodeList(),
           EnvFactory.getEnv().getConfigNodeWrapperList(),
           dataNodeWrapperList);
+
+      // Check update in PartitionInfo
+      TShowRegionResp showRegionResp = client.showRegion(new TShowRegionReq());
+      showRegionResp
+          .getRegionInfoList()
+          .forEach(
+              regionInfo -> {
+                AtomicBoolean matched = new AtomicBoolean(false);
+                dataNodeWrapperList.forEach(
+                    dataNodeWrapper -> {
+                      if (regionInfo.getClientRpcPort() == 
dataNodeWrapper.getPort()) {
+                        matched.set(true);
+                      }
+                    });
+                Assert.assertTrue(matched.get());
+              });
     }
   }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index 3a9994848c3..a46a9e213c1 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -286,15 +286,20 @@ public class ConfigPlanExecutor {
 
   public TSStatus executeNonQueryPlan(ConfigPhysicalPlan physicalPlan)
       throws UnknownPhysicalPlanTypeException {
+    TSStatus status;
     switch (physicalPlan.getType()) {
       case RegisterDataNode:
         return nodeInfo.registerDataNode((RegisterDataNodePlan) physicalPlan);
       case RemoveDataNode:
         return nodeInfo.removeDataNode((RemoveDataNodePlan) physicalPlan);
       case UpdateDataNodeConfiguration:
-        return nodeInfo.updateDataNode((UpdateDataNodePlan) physicalPlan);
+        status = nodeInfo.updateDataNode((UpdateDataNodePlan) physicalPlan);
+        if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          return status;
+        }
+        return partitionInfo.updateDataNode((UpdateDataNodePlan) physicalPlan);
       case CreateDatabase:
-        TSStatus status = 
clusterSchemaInfo.createDatabase((DatabaseSchemaPlan) physicalPlan);
+        status = clusterSchemaInfo.createDatabase((DatabaseSchemaPlan) 
physicalPlan);
         if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
           return status;
         }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
index 8ba1eac0b07..5a4971b9699 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
@@ -89,6 +89,16 @@ public class DatabasePartitionTable {
     this.preDeleted = preDeleted;
   }
 
+  /**
+   * Update the DataNodeLocation in cached RegionGroups.
+   *
+   * @param newDataNodeLocation The new DataNodeLocation.
+   */
+  public void updateDataNode(TDataNodeLocation newDataNodeLocation) {
+    regionGroupMap.forEach(
+        (regionGroupId, regionGroup) -> 
regionGroup.updateDataNode(newDataNodeLocation));
+  }
+
   /**
    * Cache allocation result of new RegionGroups.
    *
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index 979ebd7875d..16317152dee 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -39,6 +39,7 @@ import 
org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoLi
 import 
org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.database.DeleteDatabasePlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.database.PreDeleteDatabasePlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.partition.UpdateRegionLocationPlan;
@@ -139,6 +140,24 @@ public class PartitionInfo implements SnapshotProcessor {
   // Consensus read/write interfaces
   // ======================================================
 
+  /**
+   * Thread-safely update DataNodeLocation in RegionGroup.
+   *
+   * @param updateDataNodePlan UpdateDataNodePlan
+   * @return {@link TSStatusCode#SUCCESS_STATUS} if the DataNodeLocations are 
updated successfully.
+   */
+  public TSStatus updateDataNode(UpdateDataNodePlan updateDataNodePlan) {
+    TDataNodeLocation newDataNodeLocation =
+        updateDataNodePlan.getDataNodeConfiguration().getLocation();
+    databasePartitionTables.forEach(
+        (database, databasePartitionTable) -> {
+          if (isDatabaseExisted(database)) {
+            databasePartitionTable.updateDataNode(newDataNodeLocation);
+          }
+        });
+    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+  }
+
   /**
    * Thread-safely create new DatabasePartitionTable.
    *
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java
index 81fea013c21..f02b6624d4f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java
@@ -75,6 +75,21 @@ public class RegionGroup {
     return replicaSet.deepCopy();
   }
 
+  /**
+   * Update the DataNodeLocation in TRegionReplicaSet if necessary.
+   *
+   * @param newDataNodeLocation The new DataNodeLocation.
+   */
+  public void updateDataNode(TDataNodeLocation newDataNodeLocation) {
+    for (int i = 0; i < replicaSet.getDataNodeLocationsSize(); i++) {
+      if (replicaSet.getDataNodeLocations().get(i).getDataNodeId()
+          == newDataNodeLocation.getDataNodeId()) {
+        replicaSet.getDataNodeLocations().set(i, newDataNodeLocation);
+        return;
+      }
+    }
+  }
+
   public void addRegionLocation(TDataNodeLocation node) {
     replicaSet.addToDataNodeLocations(node);
     replicaSet.getDataNodeLocations().sort(TDataNodeLocation::compareTo);

Reply via email to