This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch beyyes/remove_node_stability
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/beyyes/remove_node_stability
by this push:
new 20dd3fab35 can use REMVOING data node to execute addRegionPeer in
remove datanode process
20dd3fab35 is described below
commit 20dd3fab353f73139fbaea2039f1e38432c2f7b0
Author: Beyyes <[email protected]>
AuthorDate: Tue Oct 11 20:13:52 2022 +0800
can use REMVOING data node to execute addRegionPeer in remove datanode
process
---
.../iotdb/confignode/persistence/NodeInfo.java | 8 ++-
.../procedure/env/DataNodeRemoveHandler.java | 73 +++++++++++-----------
.../impl/statemachine/RegionMigrateProcedure.java | 40 ++++++++----
3 files changed, 71 insertions(+), 50 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
index fd64713896..7adcf7e389 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
@@ -62,6 +62,8 @@ import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import static
org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS;
+
/**
* The NodeInfo stores cluster node information. The cluster node information
including: 1. DataNode
* information 2. ConfigNode information
@@ -167,7 +169,8 @@ public class NodeInfo implements SnapshotProcessor {
*/
public TSStatus removeDataNode(RemoveDataNodePlan req) {
LOGGER.info(
- "there are {} data node in cluster before executed remove-datanode.sh",
+ "{}, There are {} data node in cluster before executed
remove-datanode.sh",
+ REMOVE_DATANODE_PROCESS,
registeredDataNodes.size());
try {
dataNodeInfoReadWriteLock.writeLock().lock();
@@ -181,7 +184,8 @@ public class NodeInfo implements SnapshotProcessor {
dataNodeInfoReadWriteLock.writeLock().unlock();
}
LOGGER.info(
- "there are {} data node in cluster after executed remove-datanode.sh",
+ "{}, There are {} data node in cluster after executed
remove-datanode.sh",
+ REMOVE_DATANODE_PROCESS,
registeredDataNodes.size());
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
index f816814bb9..9a7302b8f9 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -69,6 +69,12 @@ public class DataNodeRemoveHandler {
this.configManager = configManager;
}
+ public static String getIdWithRpcEndpoint(TDataNodeLocation location) {
+ return String.format(
+ "[dataNodeId: %s, clientRpcEndPoint: %s]",
+ location.getDataNodeId(), location.getClientRpcEndPoint());
+ }
+
/**
* Get all consensus group id in this node
*
@@ -120,7 +126,8 @@ public class DataNodeRemoveHandler {
}
LOGGER.info(
- "DataNodeRemoveService finished broadcastDisableDataNode to cluster,
disabledDataNode: {}",
+ "{}, DataNodeRemoveService finished broadcastDisableDataNode to
cluster, disabledDataNode: {}",
+ REMOVE_DATANODE_PROCESS,
getIdWithRpcEndpoint(disabledDataNode));
}
@@ -187,16 +194,16 @@ public class DataNodeRemoveHandler {
DataNodeRequestType.CREATE_NEW_REGION_PEER);
LOGGER.info(
- "{}, Send action createNewRegionPeer finished, regionId: {},
destDataNode: {}",
+ "{}, Send action createNewRegionPeer finished, regionId: {},
newPeerDataNodeId: {}",
REMOVE_DATANODE_PROCESS,
regionId,
- destDataNode);
+ getIdWithRpcEndpoint(destDataNode));
if (isFailed(status)) {
LOGGER.error(
- "{}, Send action createNewRegionPeer error, regionId: {},
destDataNode: {}, result: {}",
+ "{}, Send action createNewRegionPeer error, regionId: {},
newPeerDataNodeId: {}, result: {}",
REMOVE_DATANODE_PROCESS,
regionId,
- destDataNode,
+ getIdWithRpcEndpoint(destDataNode),
status);
}
return status;
@@ -223,13 +230,13 @@ public class DataNodeRemoveHandler {
if (!selectedDataNode.isPresent()) {
LOGGER.warn(
"{}, There are no other DataNodes could be selected to perform the
add peer process, "
- + "please check RegionGroup: {} by SQL: show regions",
+ + "please check RegionGroup: {} by show regions sql command",
REMOVE_DATANODE_PROCESS,
regionId);
status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
status.setMessage(
"There are no other DataNodes could be selected to perform the add
peer process, "
- + "please check by show regions command");
+ + "please check by show regions sql command");
return status;
}
@@ -306,19 +313,6 @@ public class DataNodeRemoveHandler {
public TSStatus deleteOldRegionPeer(
TDataNodeLocation originalDataNode, TConsensusGroupId regionId) {
- // when SchemaReplicationFactor==1, execute deleteOldRegionPeer method
will cause error
- // user must delete the related data manually
- if (CONF.getSchemaReplicationFactor() == 1
- && TConsensusGroupType.SchemaRegion.equals(regionId.getType())) {
- String errorMessage =
- "deleteOldRegionPeer is not supported for schemaRegion when
SchemaReplicationFactor equals 1, "
- + "you are supposed to delete the region data of datanode
manually";
- LOGGER.info("{}, {}", REMOVE_DATANODE_PROCESS, errorMessage);
- TSStatus status = new
TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
- status.setMessage(errorMessage);
- return status;
- }
-
// when DataReplicationFactor==1, execute deleteOldRegionPeer method will
cause error
// user must delete the related data manually
// TODO if multi-leader supports deleteOldRegionPeer when
DataReplicationFactor==1?
@@ -421,14 +415,14 @@ public class DataNodeRemoveHandler {
* @throws ProcedureException procedure exception
*/
public TSStatus stopDataNode(TDataNodeLocation dataNode) throws
ProcedureException {
- LOGGER.info("Begin to stop Data Node {}", dataNode);
+ LOGGER.info("{}, Begin to stop Data Node {}", REMOVE_DATANODE_PROCESS,
dataNode);
AsyncDataNodeClientPool.getInstance().resetClient(dataNode.getInternalEndPoint());
TSStatus status =
SyncDataNodeClientPool.getInstance()
.sendSyncRequestToDataNodeWithRetry(
dataNode.getInternalEndPoint(), dataNode,
DataNodeRequestType.STOP_DATA_NODE);
configManager.getNodeManager().removeNodeCache(dataNode.getDataNodeId());
- LOGGER.info("stop Data Node {} result: {}", dataNode, status);
+ LOGGER.info("{}, Stop Data Node {} result: {}", REMOVE_DATANODE_PROCESS,
dataNode, status);
return status;
}
@@ -547,15 +541,16 @@ public class DataNodeRemoveHandler {
configManager.getConsensusManager().write(new
RemoveDataNodePlan(removeDataNodes));
}
- public void changeRegionLeader(TConsensusGroupId regionId, TDataNodeLocation
tDataNodeLocation) {
+ public void changeRegionLeader(TConsensusGroupId regionId, TDataNodeLocation
originalDataNode) {
Optional<TDataNodeLocation> newLeaderNode =
- filterDataNodeWithOtherRegionReplica(regionId, tDataNodeLocation);
+ filterDataNodeWithOtherRegionReplica(regionId, originalDataNode);
if (newLeaderNode.isPresent()) {
SyncDataNodeClientPool.getInstance()
.changeRegionLeader(
- regionId, tDataNodeLocation.getInternalEndPoint(),
newLeaderNode.get());
+ regionId, originalDataNode.getInternalEndPoint(),
newLeaderNode.get());
LOGGER.info(
- "Change region leader finished, region is {}, newLeaderNode is {}",
+ "{}, Change region leader finished, regionId: {}, newLeaderNode: {}",
+ REMOVE_DATANODE_PROCESS,
regionId,
newLeaderNode);
}
@@ -582,16 +577,24 @@ public class DataNodeRemoveHandler {
.map(TDataNodeConfiguration::getLocation)
.collect(Collectors.toList());
- // TODO replace findAny() by select the low load node.
- return regionReplicaNodes.stream()
- .filter(e -> aliveDataNodes.contains(e) && !e.equals(filterLocation))
- .findAny();
- }
+ // filter the RUNNING datanode firstly
+ // if all the datanodes are not in RUNNING status, choose the REMOVING
datanode
+ // because REMOVING datanode is also alive, it can execute rpc request
+ if (aliveDataNodes.isEmpty()) {
+ aliveDataNodes =
+
configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Removing).stream()
+ .map(TDataNodeConfiguration::getLocation)
+ .collect(Collectors.toList());
+ }
- private String getIdWithRpcEndpoint(TDataNodeLocation location) {
- return String.format(
- "dataNodeId: %s, clientRpcEndPoint: %s",
- location.getDataNodeId(), location.getClientRpcEndPoint());
+ // TODO return the node which has lowest load.
+ for (TDataNodeLocation regionReplicaNode : regionReplicaNodes) {
+ if (aliveDataNodes.contains(regionReplicaNode) &&
!regionReplicaNode.equals(filterLocation)) {
+ return Optional.of(regionReplicaNode);
+ }
+ }
+
+ return Optional.empty();
}
/**
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
index 6d4343c084..2d17ffe374 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
@@ -39,6 +39,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import static
org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS;
import static org.apache.iotdb.rpc.TSStatusCode.SUCCESS_STATUS;
/** region migrate procedure */
@@ -92,8 +93,7 @@ public class RegionMigrateProcedure
case ADD_REGION_PEER:
tsStatus =
env.getDataNodeRemoveHandler().addRegionPeer(destDataNode, consensusGroupId);
if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) {
- waitForOneMigrationStepFinished(consensusGroupId);
- LOG.info("Wait for ADD_REGION_PEER finished, regionId: {}",
consensusGroupId);
+ waitForOneMigrationStepFinished(consensusGroupId, state);
} else {
throw new ProcedureException("Failed to add region peer");
}
@@ -108,8 +108,7 @@ public class RegionMigrateProcedure
env.getDataNodeRemoveHandler()
.removeRegionPeer(originalDataNode, destDataNode,
consensusGroupId);
if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) {
- waitForOneMigrationStepFinished(consensusGroupId);
- LOG.info("Wait REMOVE_REGION_PEER finished, regionId: {}",
consensusGroupId);
+ waitForOneMigrationStepFinished(consensusGroupId, state);
} else {
throw new ProcedureException("Failed to remove region peer");
}
@@ -120,8 +119,7 @@ public class RegionMigrateProcedure
env.getDataNodeRemoveHandler()
.deleteOldRegionPeer(originalDataNode, consensusGroupId);
if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) {
- waitForOneMigrationStepFinished(consensusGroupId);
- LOG.info("Wait for DELETE_OLD_REGION_PEER finished, regionId: {}",
consensusGroupId);
+ waitForOneMigrationStepFinished(consensusGroupId, state);
}
// remove consensus group after a node stop, which will be failed,
but we will
// continuously execute.
@@ -134,15 +132,18 @@ public class RegionMigrateProcedure
}
} catch (Exception e) {
LOG.error(
- "Meets error in region migrate state, please do the rollback
operation yourself manually according to the error message!!! "
+ "{}, Meets error in region migrate state, "
+ + "please do the rollback operation yourself manually according
to the error message!!! "
+ "error state: {}, migrateResult: {}",
+ REMOVE_DATANODE_PROCESS,
state,
migrateResult);
if (isRollbackSupported(state)) {
setFailure(new ProcedureException("Region migrate failed at state: " +
state));
} else {
LOG.error(
- "Failed state is not support rollback, filed state {},
originalDataNode: {}",
+ "{}, Failed state is not support rollback, filed state {},
originalDataNode: {}",
+ REMOVE_DATANODE_PROCESS,
state,
originalDataNode);
if (getCycles() > RETRY_THRESHOLD) {
@@ -249,8 +250,15 @@ public class RegionMigrateProcedure
return false;
}
- public TSStatus waitForOneMigrationStepFinished(TConsensusGroupId
consensusGroupId)
- throws Exception {
+ public TSStatus waitForOneMigrationStepFinished(
+ TConsensusGroupId consensusGroupId, RegionTransitionState state) throws
Exception {
+
+ LOG.info(
+ "{}, Wait for state {} finished, regionId: {}",
+ REMOVE_DATANODE_PROCESS,
+ state,
+ consensusGroupId);
+
TSStatus status = new TSStatus(SUCCESS_STATUS.getStatusCode());
synchronized (regionMigrateLock) {
try {
@@ -262,7 +270,7 @@ public class RegionMigrateProcedure
String.format("Region migrate failed, regionId: %s",
consensusGroupId));
}
} catch (InterruptedException e) {
- LOG.error("region migrate {} interrupt", consensusGroupId, e);
+ LOG.error("{}, region migrate {} interrupt", REMOVE_DATANODE_PROCESS,
consensusGroupId, e);
Thread.currentThread().interrupt();
status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
status.setMessage("wait region migrate interrupt," + e.getMessage());
@@ -274,14 +282,20 @@ public class RegionMigrateProcedure
/** DataNode report region migrate result to ConfigNode, and continue */
public void notifyTheRegionMigrateFinished(TRegionMigrateResultReportReq
req) {
- LOG.info("ConfigNode received DataNode reported region migrate result: {}
", req);
+ LOG.info(
+ "{}, ConfigNode received DataNode reported region migrate result: {}",
+ REMOVE_DATANODE_PROCESS,
+ req);
// TODO the req is used in roll back
synchronized (regionMigrateLock) {
TSStatus migrateStatus = req.getMigrateResult();
// migrate failed
if (migrateStatus.getCode() != SUCCESS_STATUS.getStatusCode()) {
- LOG.info("Region migrate executed failed in DataNode, migrateStatus:
{}", migrateStatus);
+ LOG.info(
+ "{}, Region migrate executed failed in DataNode, migrateStatus:
{}",
+ REMOVE_DATANODE_PROCESS,
+ migrateStatus);
migrateSuccess = false;
migrateResult = migrateStatus.toString();
}