This is an automated email from the ASF dual-hosted git repository.

yongzao pushed a commit to branch datanode-configuration-update
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit bf5d7a807933a55999febae49dd6af679a1fe766
Author: YongzaoDan <[email protected]>
AuthorDate: Wed Jan 17 14:00:11 2024 +0800

    Finish
---
 .../it/cluster/IoTDBClusterRestartIT.java          | 59 ++++++++++++++++++++++
 .../persistence/executor/ConfigPlanExecutor.java   |  9 +++-
 .../partition/DatabasePartitionTable.java          | 10 ++++
 .../persistence/partition/PartitionInfo.java       | 19 +++++++
 .../persistence/partition/RegionGroup.java         | 15 ++++++
 5 files changed, 110 insertions(+), 2 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java
index 8b22780f8d2..7b0d0cd2fe2 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java
@@ -19,11 +19,17 @@
 
 package org.apache.iotdb.confignode.it.cluster;
 
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
+import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
+import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
 import org.apache.iotdb.it.env.EnvFactory;
 import org.apache.iotdb.it.env.cluster.EnvUtils;
 import org.apache.iotdb.it.env.cluster.config.MppBaseConfig;
@@ -33,9 +39,11 @@ import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
 import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.thrift.TException;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -46,6 +54,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.iotdb.consensus.ConsensusFactory.RATIS_CONSENSUS;
@@ -59,6 +68,8 @@ public class IoTDBClusterRestartIT {
 
   private static final int testConfigNodeNum = 3, testDataNodeNum = 2;
 
+  private static final long testTimePartitionInterval = 604800000;
+
   @Before
   public void setUp() {
     EnvFactory.getEnv()
@@ -114,6 +125,39 @@ public class IoTDBClusterRestartIT {
   @Test
   public void clusterRestartAfterUpdateDataNodeTest()
       throws InterruptedException, ClientManagerException, IOException, 
TException {
+    // Create some DataPartitions to extend 2 DataRegionGroups
+    Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap =
+        ConfigNodeTestUtils.constructPartitionSlotsMap(
+            "root.database", 0, 10, 0, 10, testTimePartitionInterval);
+    TDataPartitionReq dataPartitionReq = new 
TDataPartitionReq(partitionSlotsMap);
+    TDataPartitionTableResp dataPartitionTableResp = null;
+    for (int retry = 0; retry < 5; retry++) {
+      // Build new Client since it's unstable in Win8 environment
+      try (SyncConfigNodeIServiceClient configNodeClient =
+          (SyncConfigNodeIServiceClient) 
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+        dataPartitionTableResp = 
configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
+        if (dataPartitionTableResp != null) {
+          break;
+        }
+      } catch (Exception e) {
+        // Retry sometimes in order to avoid request timeout
+        logger.error(e.getMessage());
+        TimeUnit.SECONDS.sleep(1);
+      }
+    }
+    Assert.assertNotNull(dataPartitionTableResp);
+    Assert.assertEquals(
+        TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
dataPartitionTableResp.getStatus().getCode());
+    Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable());
+    ConfigNodeTestUtils.checkDataPartitionTable(
+        "root.database",
+        0,
+        10,
+        0,
+        10,
+        testTimePartitionInterval,
+        dataPartitionTableResp.getDataPartitionTable());
+
     // Shutdown all DataNodes
     for (int i = 0; i < testDataNodeNum; i++) {
       EnvFactory.getEnv().shutdownDataNode(i);
@@ -151,12 +195,27 @@ public class IoTDBClusterRestartIT {
     // Check DataNode EndPoint
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+      // Check update in NodeInfo
       TShowClusterResp showClusterResp = client.showCluster();
       ConfigNodeTestUtils.checkNodeConfig(
           showClusterResp.getConfigNodeList(),
           showClusterResp.getDataNodeList(),
           EnvFactory.getEnv().getConfigNodeWrapperList(),
           dataNodeWrapperList);
+
+      // Check update in PartitionInfo
+      TShowRegionResp showRegionResp = client.showRegion(new TShowRegionReq());
+      showRegionResp
+          .getRegionInfoList()
+          .forEach(
+              regionInfo ->
+                  dataNodeWrapperList.forEach(
+                      dataNodeWrapper -> {
+                        if 
(regionInfo.getClientRpcIp().equals(dataNodeWrapper.getIp())) {
+                          Assert.assertEquals(
+                              dataNodeWrapper.getPort(), 
regionInfo.getClientRpcPort());
+                        }
+                      }));
     }
   }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index 3a9994848c3..a46a9e213c1 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -286,15 +286,20 @@ public class ConfigPlanExecutor {
 
   public TSStatus executeNonQueryPlan(ConfigPhysicalPlan physicalPlan)
       throws UnknownPhysicalPlanTypeException {
+    TSStatus status;
     switch (physicalPlan.getType()) {
       case RegisterDataNode:
         return nodeInfo.registerDataNode((RegisterDataNodePlan) physicalPlan);
       case RemoveDataNode:
         return nodeInfo.removeDataNode((RemoveDataNodePlan) physicalPlan);
       case UpdateDataNodeConfiguration:
-        return nodeInfo.updateDataNode((UpdateDataNodePlan) physicalPlan);
+        status = nodeInfo.updateDataNode((UpdateDataNodePlan) physicalPlan);
+        if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          return status;
+        }
+        return partitionInfo.updateDataNode((UpdateDataNodePlan) physicalPlan);
       case CreateDatabase:
-        TSStatus status = 
clusterSchemaInfo.createDatabase((DatabaseSchemaPlan) physicalPlan);
+        status = clusterSchemaInfo.createDatabase((DatabaseSchemaPlan) 
physicalPlan);
         if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
           return status;
         }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
index aaa9d53ccf1..3bca725dcf0 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
@@ -89,6 +89,16 @@ public class DatabasePartitionTable {
     this.preDeleted = preDeleted;
   }
 
+  /**
+   * Update the DataNodeLocation in cached RegionGroups.
+   *
+   * @param newDataNodeLocation The new DataNodeLocation.
+   */
+  public void updateDataNode(TDataNodeLocation newDataNodeLocation) {
+    regionGroupMap.forEach(
+        (regionGroupId, regionGroup) -> 
regionGroup.updateDataNode(newDataNodeLocation));
+  }
+
   /**
    * Cache allocation result of new RegionGroups.
    *
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index 979ebd7875d..16317152dee 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -39,6 +39,7 @@ import 
org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoLi
 import 
org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.database.DeleteDatabasePlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.database.PreDeleteDatabasePlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.partition.UpdateRegionLocationPlan;
@@ -139,6 +140,24 @@ public class PartitionInfo implements SnapshotProcessor {
   // Consensus read/write interfaces
   // ======================================================
 
+  /**
+   * Thread-safely update DataNodeLocation in RegionGroup.
+   *
+   * @param updateDataNodePlan UpdateDataNodePlan
+   * @return {@link TSStatusCode#SUCCESS_STATUS} if the DataNodeLocations are 
updated successfully.
+   */
+  public TSStatus updateDataNode(UpdateDataNodePlan updateDataNodePlan) {
+    TDataNodeLocation newDataNodeLocation =
+        updateDataNodePlan.getDataNodeConfiguration().getLocation();
+    databasePartitionTables.forEach(
+        (database, databasePartitionTable) -> {
+          if (isDatabaseExisted(database)) {
+            databasePartitionTable.updateDataNode(newDataNodeLocation);
+          }
+        });
+    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+  }
+
   /**
    * Thread-safely create new DatabasePartitionTable.
    *
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java
index 81fea013c21..f02b6624d4f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java
@@ -75,6 +75,21 @@ public class RegionGroup {
     return replicaSet.deepCopy();
   }
 
+  /**
+   * Update the DataNodeLocation in TRegionReplicaSet if necessary.
+   *
+   * @param newDataNodeLocation The new DataNodeLocation.
+   */
+  public void updateDataNode(TDataNodeLocation newDataNodeLocation) {
+    for (int i = 0; i < replicaSet.getDataNodeLocationsSize(); i++) {
+      if (replicaSet.getDataNodeLocations().get(i).getDataNodeId()
+          == newDataNodeLocation.getDataNodeId()) {
+        replicaSet.getDataNodeLocations().set(i, newDataNodeLocation);
+        return;
+      }
+    }
+  }
+
   public void addRegionLocation(TDataNodeLocation node) {
     replicaSet.addToDataNodeLocations(node);
     replicaSet.getDataNodeLocations().sort(TDataNodeLocation::compareTo);

Reply via email to