This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6d46ceb4a05 [IOTDB-6291] Update RegionGroup configuration when update
DataNode configuration (#11914)
6d46ceb4a05 is described below
commit 6d46ceb4a053b12b6ebd01c11c654c2cf4f86c82
Author: Yongzao <[email protected]>
AuthorDate: Wed Jan 17 17:34:48 2024 +0800
[IOTDB-6291] Update RegionGroup configuration when update DataNode
configuration (#11914)
---
.../it/cluster/IoTDBClusterRestartIT.java | 70 ++++++++++++++++++++++
.../persistence/executor/ConfigPlanExecutor.java | 9 ++-
.../partition/DatabasePartitionTable.java | 10 ++++
.../persistence/partition/PartitionInfo.java | 19 ++++++
.../persistence/partition/RegionGroup.java | 15 +++++
5 files changed, 121 insertions(+), 2 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java
index 8b22780f8d2..8e5e17b17bf 100644
---
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java
@@ -19,11 +19,19 @@
package org.apache.iotdb.confignode.it.cluster;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
+import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
+import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.env.cluster.EnvUtils;
import org.apache.iotdb.it.env.cluster.config.MppBaseConfig;
@@ -33,9 +41,11 @@ import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.TException;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -46,7 +56,9 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.iotdb.consensus.ConsensusFactory.RATIS_CONSENSUS;
@@ -59,6 +71,8 @@ public class IoTDBClusterRestartIT {
private static final int testConfigNodeNum = 3, testDataNodeNum = 2;
+ private static final long testTimePartitionInterval = 604800000;
+
@Before
public void setUp() {
EnvFactory.getEnv()
@@ -114,6 +128,45 @@ public class IoTDBClusterRestartIT {
@Test
public void clusterRestartAfterUpdateDataNodeTest()
throws InterruptedException, ClientManagerException, IOException,
TException {
+ // Create default Database
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+ TSStatus status = client.setDatabase(new
TDatabaseSchema("root.database"));
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+ }
+ // Create some DataPartitions to extend 2 DataRegionGroups
+ Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap =
+ ConfigNodeTestUtils.constructPartitionSlotsMap(
+ "root.database", 0, 10, 0, 10, testTimePartitionInterval);
+ TDataPartitionReq dataPartitionReq = new
TDataPartitionReq(partitionSlotsMap);
+ TDataPartitionTableResp dataPartitionTableResp = null;
+ for (int retry = 0; retry < 5; retry++) {
+ // Build new Client since it's unstable in Win8 environment
+ try (SyncConfigNodeIServiceClient configNodeClient =
+ (SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+ dataPartitionTableResp =
configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
+ if (dataPartitionTableResp != null) {
+ break;
+ }
+ } catch (Exception e) {
+ // Retry sometimes in order to avoid request timeout
+ logger.error(e.getMessage());
+ TimeUnit.SECONDS.sleep(1);
+ }
+ }
+ Assert.assertNotNull(dataPartitionTableResp);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
dataPartitionTableResp.getStatus().getCode());
+ Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable());
+ ConfigNodeTestUtils.checkDataPartitionTable(
+ "root.database",
+ 0,
+ 10,
+ 0,
+ 10,
+ testTimePartitionInterval,
+ dataPartitionTableResp.getDataPartitionTable());
+
// Shutdown all DataNodes
for (int i = 0; i < testDataNodeNum; i++) {
EnvFactory.getEnv().shutdownDataNode(i);
@@ -151,12 +204,29 @@ public class IoTDBClusterRestartIT {
// Check DataNode EndPoint
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+ // Check update in NodeInfo
TShowClusterResp showClusterResp = client.showCluster();
ConfigNodeTestUtils.checkNodeConfig(
showClusterResp.getConfigNodeList(),
showClusterResp.getDataNodeList(),
EnvFactory.getEnv().getConfigNodeWrapperList(),
dataNodeWrapperList);
+
+ // Check update in PartitionInfo
+ TShowRegionResp showRegionResp = client.showRegion(new TShowRegionReq());
+ showRegionResp
+ .getRegionInfoList()
+ .forEach(
+ regionInfo -> {
+ AtomicBoolean matched = new AtomicBoolean(false);
+ dataNodeWrapperList.forEach(
+ dataNodeWrapper -> {
+ if (regionInfo.getClientRpcPort() ==
dataNodeWrapper.getPort()) {
+ matched.set(true);
+ }
+ });
+ Assert.assertTrue(matched.get());
+ });
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index 3a9994848c3..a46a9e213c1 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -286,15 +286,20 @@ public class ConfigPlanExecutor {
public TSStatus executeNonQueryPlan(ConfigPhysicalPlan physicalPlan)
throws UnknownPhysicalPlanTypeException {
+ TSStatus status;
switch (physicalPlan.getType()) {
case RegisterDataNode:
return nodeInfo.registerDataNode((RegisterDataNodePlan) physicalPlan);
case RemoveDataNode:
return nodeInfo.removeDataNode((RemoveDataNodePlan) physicalPlan);
case UpdateDataNodeConfiguration:
- return nodeInfo.updateDataNode((UpdateDataNodePlan) physicalPlan);
+ status = nodeInfo.updateDataNode((UpdateDataNodePlan) physicalPlan);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return status;
+ }
+ return partitionInfo.updateDataNode((UpdateDataNodePlan) physicalPlan);
case CreateDatabase:
- TSStatus status =
clusterSchemaInfo.createDatabase((DatabaseSchemaPlan) physicalPlan);
+ status = clusterSchemaInfo.createDatabase((DatabaseSchemaPlan)
physicalPlan);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
index 8ba1eac0b07..5a4971b9699 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
@@ -89,6 +89,16 @@ public class DatabasePartitionTable {
this.preDeleted = preDeleted;
}
+ /**
+ * Update the DataNodeLocation in cached RegionGroups.
+ *
+ * @param newDataNodeLocation The new DataNodeLocation.
+ */
+ public void updateDataNode(TDataNodeLocation newDataNodeLocation) {
+ regionGroupMap.forEach(
+ (regionGroupId, regionGroup) ->
regionGroup.updateDataNode(newDataNodeLocation));
+ }
+
/**
* Cache allocation result of new RegionGroups.
*
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index 979ebd7875d..16317152dee 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -39,6 +39,7 @@ import
org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoLi
import
org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan;
import
org.apache.iotdb.confignode.consensus.request.write.database.DeleteDatabasePlan;
import
org.apache.iotdb.confignode.consensus.request.write.database.PreDeleteDatabasePlan;
+import
org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
import
org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan;
import
org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan;
import
org.apache.iotdb.confignode.consensus.request.write.partition.UpdateRegionLocationPlan;
@@ -139,6 +140,24 @@ public class PartitionInfo implements SnapshotProcessor {
// Consensus read/write interfaces
// ======================================================
+ /**
+ * Thread-safely update DataNodeLocation in RegionGroup.
+ *
+ * @param updateDataNodePlan UpdateDataNodePlan
+ * @return {@link TSStatusCode#SUCCESS_STATUS} if the DataNodeLocations are
updated successfully.
+ */
+ public TSStatus updateDataNode(UpdateDataNodePlan updateDataNodePlan) {
+ TDataNodeLocation newDataNodeLocation =
+ updateDataNodePlan.getDataNodeConfiguration().getLocation();
+ databasePartitionTables.forEach(
+ (database, databasePartitionTable) -> {
+ if (isDatabaseExisted(database)) {
+ databasePartitionTable.updateDataNode(newDataNodeLocation);
+ }
+ });
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ }
+
/**
* Thread-safely create new DatabasePartitionTable.
*
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java
index 81fea013c21..f02b6624d4f 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java
@@ -75,6 +75,21 @@ public class RegionGroup {
return replicaSet.deepCopy();
}
+ /**
+ * Update the DataNodeLocation in TRegionReplicaSet if necessary.
+ *
+ * @param newDataNodeLocation The new DataNodeLocation.
+ */
+ public void updateDataNode(TDataNodeLocation newDataNodeLocation) {
+ for (int i = 0; i < replicaSet.getDataNodeLocationsSize(); i++) {
+ if (replicaSet.getDataNodeLocations().get(i).getDataNodeId()
+ == newDataNodeLocation.getDataNodeId()) {
+ replicaSet.getDataNodeLocations().set(i, newDataNodeLocation);
+ return;
+ }
+ }
+ }
+
public void addRegionLocation(TDataNodeLocation node) {
replicaSet.addToDataNodeLocations(node);
replicaSet.getDataNodeLocations().sort(TDataNodeLocation::compareTo);