This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 0f0816ec20 [IOTDB-4340] Refactor Region migration related interfaces
(#7243)
0f0816ec20 is described below
commit 0f0816ec2071ac2bee32366e69a3c27b595533be
Author: YongzaoDan <[email protected]>
AuthorDate: Wed Sep 7 15:29:57 2022 +0800
[IOTDB-4340] Refactor Region migration related interfaces (#7243)
---
.../confignode/client/DataNodeRequestType.java | 4 +-
.../sync/datanode/SyncDataNodeClientPool.java | 14 +-
.../procedure/env/DataNodeRemoveHandler.java | 144 +++++++++++----------
.../procedure/impl/RegionMigrateProcedure.java | 25 ++--
.../procedure/state/RegionTransitionState.java | 4 +-
.../iotdb/db/service/RegionMigrateService.java | 58 ++++-----
.../impl/DataNodeInternalRPCServiceImpl.java | 60 +++++----
thrift/src/main/thrift/datanode.thrift | 73 +++++------
8 files changed, 194 insertions(+), 188 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
index 521a13f098..7ed20d7e94 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
@@ -24,10 +24,10 @@ public enum DataNodeRequestType {
INVALIDATE_PARTITION_CACHE,
INVALIDATE_PERMISSION_CACHE,
INVALIDATE_SCHEMA_CACHE,
- CREATE_PEER,
+ CREATE_NEW_REGION_PEER,
ADD_REGION_PEER,
REMOVE_REGION_PEER,
- DELETE_PEER,
+ DELETE_OLD_REGION_PEER,
DISABLE_DATA_NODE,
STOP_DATA_NODE,
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/datanode/SyncDataNodeClientPool.java
b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/datanode/SyncDataNodeClientPool.java
index 0c85ffda88..2323d5162f 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/datanode/SyncDataNodeClientPool.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/datanode/SyncDataNodeClientPool.java
@@ -31,7 +31,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TCreatePeerReq;
import org.apache.iotdb.mpp.rpc.thrift.TDisableDataNodeReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidatePermissionCacheReq;
-import org.apache.iotdb.mpp.rpc.thrift.TMigrateRegionReq;
+import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq;
import org.apache.iotdb.rpc.RpcUtils;
@@ -85,14 +85,14 @@ public class SyncDataNodeClientPool {
return client.stopDataNode();
case UPDATE_TEMPLATE:
return client.updateTemplate((TUpdateTemplateReq) req);
- case CREATE_PEER:
- return client.createPeerToConsensusGroup((TCreatePeerReq) req);
+ case CREATE_NEW_REGION_PEER:
+ return client.createNewRegionPeer((TCreatePeerReq) req);
case ADD_REGION_PEER:
- return client.addRegionPeer((TMigrateRegionReq) req);
+ return client.addRegionPeer((TMaintainPeerReq) req);
case REMOVE_REGION_PEER:
- return client.removeRegionPeer((TMigrateRegionReq) req);
- case DELETE_PEER:
- return client.deletePeerToConsensusGroup((TMigrateRegionReq) req);
+ return client.removeRegionPeer((TMaintainPeerReq) req);
+ case DELETE_OLD_REGION_PEER:
+ return client.deleteOldRegionPeer((TMaintainPeerReq) req);
default:
return RpcUtils.getStatus(
TSStatusCode.EXECUTE_STATEMENT_ERROR, "Unknown request type: "
+ requestType);
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
index 1f3e53a730..e20f691d49 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -38,7 +38,7 @@ import
org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
import org.apache.iotdb.mpp.rpc.thrift.TCreatePeerReq;
import org.apache.iotdb.mpp.rpc.thrift.TDisableDataNodeReq;
-import org.apache.iotdb.mpp.rpc.thrift.TMigrateRegionReq;
+import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
@@ -136,110 +136,111 @@ public class DataNodeRemoveHandler {
}
/**
- * Send to DataNode, add region peer
+ * Order the specific ConsensusGroup to add peer for the new RegionReplica.
*
- * @param originalDataNode old location data node
- * @param destDataNode dest data node
+ * <p>The add peer interface could be invoked at any DataNode who contains
one of the
+ * RegionReplica of the specified ConsensusGroup except the new one
+ *
+ * @param destDataNode The DataNodeLocation where the new RegionReplica is
created
* @param regionId region id
- * @return migrate status
+ * @return TSStatus
*/
- public TSStatus addRegionPeer(
- TDataNodeLocation originalDataNode,
- TDataNodeLocation destDataNode,
- TConsensusGroupId regionId) {
+ public TSStatus addRegionPeer(TDataNodeLocation destDataNode,
TConsensusGroupId regionId) {
TSStatus status;
- Optional<TDataNodeLocation> otherNode = findNodeOfAnotherReplica(regionId,
originalDataNode);
- if (!otherNode.isPresent()) {
+
+ // Here we pick the DataNode who contains one of the RegionReplica of the
specified
+ // ConsensusGroup except the new one
+ // in order to notify the origin ConsensusGroup that another peer is
created and demand to join
+ Optional<TDataNodeLocation> selectedDataNode =
+ filterDataNodeWithOtherRegionReplica(regionId, destDataNode);
+ if (!selectedDataNode.isPresent()) {
LOGGER.warn(
- "No other Node to change region leader, check by show regions,
region: {}", regionId);
+ "There are no other DataNodes could be selected to perform the add
peer process, please check RegionGroup: {} by SQL: show regions",
+ regionId);
status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
- status.setMessage("No other Node to change region leader, check by show
regions");
+ status.setMessage(
+ "There are no other DataNodes could be selected to perform the add
peer process, please check by SQL: show regions");
return status;
}
- TMigrateRegionReq migrateRegionReq =
- new TMigrateRegionReq(regionId, originalDataNode, destDataNode);
- migrateRegionReq.setNewLeaderNode(otherNode.get());
-
- // send to otherNode node
+ // Send addRegionPeer request to the selected DataNode
+ TMaintainPeerReq maintainPeerReq = new TMaintainPeerReq(regionId,
selectedDataNode.get());
status =
SyncDataNodeClientPool.getInstance()
.sendSyncRequestToDataNodeWithRetry(
- otherNode.get().getInternalEndPoint(),
- migrateRegionReq,
+ selectedDataNode.get().getInternalEndPoint(),
+ maintainPeerReq,
DataNodeRequestType.ADD_REGION_PEER);
LOGGER.info(
"Send region {} add peer action to {}, wait it finished",
regionId,
- otherNode.get().getInternalEndPoint());
+ selectedDataNode.get().getInternalEndPoint());
return status;
}
/**
- * Send to DataNode, remove region
+ * Order the specific ConsensusGroup to remove peer for the old
RegionReplica.
*
- * @param originalDataNode old location data node
- * @param destDataNode dest data node
+ * <p>The remove peer interface could be invoked at any DataNode who
contains one of the
+ * RegionReplica of the specified ConsensusGroup except the origin one
+ *
+ * @param originalDataNode The DataNodeLocation who contains the original
RegionReplica
* @param regionId region id
- * @return migrate status
+ * @return TSStatus
*/
- public TSStatus removeRegionPeer(
- TDataNodeLocation originalDataNode,
- TDataNodeLocation destDataNode,
- TConsensusGroupId regionId) {
+ public TSStatus removeRegionPeer(TDataNodeLocation originalDataNode,
TConsensusGroupId regionId) {
TSStatus status;
- Optional<TDataNodeLocation> otherNode = findNodeOfAnotherReplica(regionId,
originalDataNode);
- if (!otherNode.isPresent()) {
+
+ // Here we pick the DataNode who contains one of the RegionReplica of the
specified
+ // ConsensusGroup except the origin one
+ // in order to notify the new ConsensusGroup that the origin peer should
secede now
+ Optional<TDataNodeLocation> selectedDataNode =
+ filterDataNodeWithOtherRegionReplica(regionId, originalDataNode);
+ if (!selectedDataNode.isPresent()) {
LOGGER.warn(
- "No other Node to change region leader, check by show regions,
region: {}", regionId);
+ "There are no other DataNodes could be selected to perform the
remove peer process, please check RegionGroup: {} by SQL: show regions",
+ regionId);
status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
- status.setMessage("No other Node to change region leader, check by show
regions");
+ status.setMessage(
+ "There are no other DataNodes could be selected to perform the
remove peer process, please check by SQL: show regions");
return status;
}
- TMigrateRegionReq migrateRegionReq =
- new TMigrateRegionReq(regionId, originalDataNode, destDataNode);
- migrateRegionReq.setNewLeaderNode(otherNode.get());
-
- // send to other node
+ // Send addRegionPeer request to the selected DataNode
+ TMaintainPeerReq maintainPeerReq = new TMaintainPeerReq(regionId,
selectedDataNode.get());
status =
SyncDataNodeClientPool.getInstance()
.sendSyncRequestToDataNodeWithRetry(
- otherNode.get().getInternalEndPoint(),
- migrateRegionReq,
+ selectedDataNode.get().getInternalEndPoint(),
+ maintainPeerReq,
DataNodeRequestType.REMOVE_REGION_PEER);
LOGGER.info(
"Send region {} remove peer to {}, wait it finished",
regionId,
- otherNode.get().getInternalEndPoint());
+ selectedDataNode.get().getInternalEndPoint());
return status;
}
/**
- * Send to DataNode, delete peer from originalDataNode node.
+ * Delete a Region peer in the given ConsensusGroup and all of its data on
the specified DataNode
*
* <p>If the originalDataNode is down, we should delete local data and do
other cleanup works
* manually.
*
- * @param originalDataNode data node where the peer to be deleted locates
- * @param destDataNode dest data node to be migrated
+ * @param originalDataNode The DataNodeLocation who contains the original
RegionReplica
* @param regionId region id
- * @return migrate status
+ * @return TSStatus
*/
- public TSStatus deletePeer(
- TDataNodeLocation originalDataNode,
- TDataNodeLocation destDataNode,
- TConsensusGroupId regionId) {
+ public TSStatus deleteOldRegionPeer(
+ TDataNodeLocation originalDataNode, TConsensusGroupId regionId) {
TSStatus status;
- TMigrateRegionReq migrateRegionReq =
- new TMigrateRegionReq(regionId, originalDataNode, destDataNode);
- migrateRegionReq.setNewLeaderNode(originalDataNode);
+ TMaintainPeerReq maintainPeerReq = new TMaintainPeerReq(regionId,
originalDataNode);
status =
SyncDataNodeClientPool.getInstance()
.sendSyncRequestToDataNodeWithRetry(
originalDataNode.getInternalEndPoint(),
- migrateRegionReq,
- DataNodeRequestType.DELETE_PEER);
+ maintainPeerReq,
+ DataNodeRequestType.DELETE_OLD_REGION_PEER);
LOGGER.info(
"Send region {} delete peer action to {}, wait it finished",
regionId,
@@ -304,16 +305,17 @@ public class DataNodeRemoveHandler {
}
/**
- * Create a Peer and become a member of the given consensus group.
+ * Create a new RegionReplica and build the ConsensusGroup on the destined
DataNode
*
- * <p>CreatePeer should be called on a node that does not contain any peer
of the consensus group,
- * to avoid one node having more than one replica.
+ * <p>createNewRegionPeer should be invoked on a DataNode that doesn't
contain any peer of the
+ * specific ConsensusGroup, in order to avoid there exists one DataNode who
has more than one
+ * RegionReplica.
*
- * @param regionId region id, means the given consensus group
- * @param destDataNode dest data node where the peer creates
+ * @param regionId The given ConsensusGroup
+ * @param destDataNode The destined DataNode where the new peer will be
created
* @return status
*/
- public TSStatus createPeer(TConsensusGroupId regionId, TDataNodeLocation
destDataNode) {
+ public TSStatus createNewRegionPeer(TConsensusGroupId regionId,
TDataNodeLocation destDataNode) {
TSStatus status;
List<TDataNodeLocation> regionReplicaNodes =
findRegionReplicaNodes(regionId);
if (regionReplicaNodes.isEmpty()) {
@@ -333,7 +335,9 @@ public class DataNodeRemoveHandler {
status =
SyncDataNodeClientPool.getInstance()
.sendSyncRequestToDataNodeWithRetry(
- destDataNode.getInternalEndPoint(), req,
DataNodeRequestType.CREATE_PEER);
+ destDataNode.getInternalEndPoint(),
+ req,
+ DataNodeRequestType.CREATE_NEW_REGION_PEER);
LOGGER.info("Send create peer for regionId {} on data node {}", regionId,
destDataNode);
if (isFailed(status)) {
@@ -457,7 +461,7 @@ public class DataNodeRemoveHandler {
public void changeRegionLeader(TConsensusGroupId regionId, TDataNodeLocation
tDataNodeLocation) {
Optional<TDataNodeLocation> newLeaderNode =
- findNodeOfAnotherReplica(regionId, tDataNodeLocation);
+ filterDataNodeWithOtherRegionReplica(regionId, tDataNodeLocation);
if (newLeaderNode.isPresent()) {
SyncDataNodeClientPool.getInstance()
.changeRegionLeader(
@@ -469,8 +473,16 @@ public class DataNodeRemoveHandler {
}
}
- private Optional<TDataNodeLocation> findNodeOfAnotherReplica(
- TConsensusGroupId regionId, TDataNodeLocation tDataNodeLocation) {
+ /**
+ * Filter a DataNode who contains other RegionReplica excepts the given one
+ *
+ * @param regionId The specific RegionId
+ * @param filterLocation The DataNodeLocation that should be filtered
+ * @return A DataNodeLocation that contains other RegionReplica and
different from the
+ * filterLocation
+ */
+ private Optional<TDataNodeLocation> filterDataNodeWithOtherRegionReplica(
+ TConsensusGroupId regionId, TDataNodeLocation filterLocation) {
List<TDataNodeLocation> regionReplicaNodes =
findRegionReplicaNodes(regionId);
if (regionReplicaNodes.isEmpty()) {
LOGGER.warn("Not find region replica nodes, region: {}", regionId);
@@ -478,8 +490,6 @@ public class DataNodeRemoveHandler {
}
// TODO replace findAny() by select the low load node.
- Optional<TDataNodeLocation> newLeaderNode =
- regionReplicaNodes.stream().filter(e ->
!e.equals(tDataNodeLocation)).findAny();
- return newLeaderNode;
+ return regionReplicaNodes.stream().filter(e ->
!e.equals(filterLocation)).findAny();
}
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RegionMigrateProcedure.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RegionMigrateProcedure.java
index 937130a636..1fa944b113 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RegionMigrateProcedure.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RegionMigrateProcedure.java
@@ -74,20 +74,18 @@ public class RegionMigrateProcedure
if (consensusGroupId == null) {
return Flow.NO_MORE_STATE;
}
- TSStatus tsStatus = null;
+ TSStatus tsStatus;
try {
switch (state) {
case REGION_MIGRATE_PREPARE:
- setNextState(RegionTransitionState.CREATE_PEER);
+ setNextState(RegionTransitionState.CREATE_NEW_REGION_PEER);
break;
- case CREATE_PEER:
- env.getDataNodeRemoveHandler().createPeer(consensusGroupId,
destDataNode);
+ case CREATE_NEW_REGION_PEER:
+ env.getDataNodeRemoveHandler().createNewRegionPeer(consensusGroupId,
destDataNode);
setNextState(RegionTransitionState.ADD_REGION_PEER);
break;
case ADD_REGION_PEER:
- tsStatus =
- env.getDataNodeRemoveHandler()
- .addRegionPeer(originalDataNode, destDataNode,
consensusGroupId);
+ tsStatus =
env.getDataNodeRemoveHandler().addRegionPeer(destDataNode, consensusGroupId);
if (tsStatus.getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
waitForOneMigrationStepFinished(consensusGroupId);
LOG.info("Wait for region {} add peer finished",
consensusGroupId);
@@ -102,26 +100,25 @@ public class RegionMigrateProcedure
break;
case REMOVE_REGION_PEER:
tsStatus =
- env.getDataNodeRemoveHandler()
- .removeRegionPeer(originalDataNode, destDataNode,
consensusGroupId);
+
env.getDataNodeRemoveHandler().removeRegionPeer(originalDataNode,
consensusGroupId);
if (tsStatus.getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
waitForOneMigrationStepFinished(consensusGroupId);
LOG.info("Wait for region {} remove peer finished",
consensusGroupId);
} else {
throw new ProcedureException("Failed to remove region peer");
}
- setNextState(RegionTransitionState.DELETE_PEER);
+ setNextState(RegionTransitionState.DELETE_OLD_REGION_PEER);
break;
- case DELETE_PEER:
+ case DELETE_OLD_REGION_PEER:
tsStatus =
env.getDataNodeRemoveHandler()
- .deletePeer(originalDataNode, destDataNode,
consensusGroupId);
+ .deleteOldRegionPeer(originalDataNode, consensusGroupId);
if (tsStatus.getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
waitForOneMigrationStepFinished(consensusGroupId);
LOG.info("Wait for region {} remove consensus group finished",
consensusGroupId);
}
- // remove consensus group after a node stop, which will be failed,
but we will continue
- // execute.
+ // remove consensus group after a node stop, which will be failed,
but we will
+ // continuously execute.
setNextState(RegionTransitionState.UPDATE_REGION_LOCATION_CACHE);
break;
case UPDATE_REGION_LOCATION_CACHE:
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RegionTransitionState.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RegionTransitionState.java
index 71aacde1f2..a21a6af941 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RegionTransitionState.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RegionTransitionState.java
@@ -21,10 +21,10 @@ package org.apache.iotdb.confignode.procedure.state;
public enum RegionTransitionState {
REGION_MIGRATE_PREPARE,
- CREATE_PEER,
+ CREATE_NEW_REGION_PEER,
ADD_REGION_PEER,
CHANGE_REGION_LEADER,
REMOVE_REGION_PEER,
- DELETE_PEER,
+ DELETE_OLD_REGION_PEER,
UPDATE_REGION_LOCATION_CACHE
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
b/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
index d4170f4dfe..9cbe669564 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
@@ -39,7 +39,7 @@ import
org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.engine.StorageEngineV2;
import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
import org.apache.iotdb.db.rescon.AbstractPoolManager;
-import org.apache.iotdb.mpp.rpc.thrift.TMigrateRegionReq;
+import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.TException;
@@ -69,16 +69,16 @@ public class RegionMigrateService implements IService {
* @param req TMigrateRegionReq
* @return submit task succeed?
*/
- public synchronized boolean submitAddRegionPeerTask(TMigrateRegionReq req) {
+ public synchronized boolean submitAddRegionPeerTask(TMaintainPeerReq req) {
boolean submitSucceed = true;
try {
- regionMigratePool.submit(new AddRegionPeerTask(req.getRegionId(),
req.getToNode()));
+ regionMigratePool.submit(new AddRegionPeerTask(req.getRegionId(),
req.getDestNode()));
} catch (Exception e) {
LOGGER.error(
- "submit add region peer task error. region: {}, to: {}.",
+ "Submit add region peer task error for Region: {} on DataNode: {}.",
req.getRegionId(),
- req.getToNode().getInternalEndPoint().getIp(),
+ req.getDestNode().getInternalEndPoint().getIp(),
e);
submitSucceed = false;
}
@@ -91,17 +91,16 @@ public class RegionMigrateService implements IService {
* @param req TMigrateRegionReq
* @return submit task succeed?
*/
- public synchronized boolean submitRemoveRegionPeerTask(TMigrateRegionReq
req) {
+ public synchronized boolean submitRemoveRegionPeerTask(TMaintainPeerReq req)
{
boolean submitSucceed = true;
try {
- regionMigratePool.submit(
- new RemoveRegionPeerTask(req.getRegionId(), req.getFromNode(),
req.getNewLeaderNode()));
+ regionMigratePool.submit(new RemoveRegionPeerTask(req.getRegionId(),
req.getDestNode()));
} catch (Exception e) {
LOGGER.error(
- "submit remove region peer task error. region: {}, from: {}.",
+ "Submit remove region peer task error for Region: {} on DataNode:
{}.",
req.getRegionId(),
- req.getFromNode().getInternalEndPoint().getIp(),
+ req.getDestNode().getInternalEndPoint().getIp(),
e);
submitSucceed = false;
}
@@ -114,17 +113,17 @@ public class RegionMigrateService implements IService {
* @param req TMigrateRegionReq
* @return submit task succeed?
*/
- public synchronized boolean
submitRemoveRegionConsensusGroupTask(TMigrateRegionReq req) {
+ public synchronized boolean
submitRemoveRegionConsensusGroupTask(TMaintainPeerReq req) {
boolean submitSucceed = true;
try {
regionMigratePool.submit(
- new RemoveRegionConsensusGroupTask(req.getRegionId(),
req.getFromNode()));
+ new RemoveRegionConsensusGroupTask(req.getRegionId(),
req.getDestNode()));
} catch (Exception e) {
LOGGER.error(
- "submit remove region consensus group task error. region: {}, from:
{}.",
+ "Submit remove region peer task error for Region: {} on DataNode:
{}.",
req.getRegionId(),
- req.getFromNode().getInternalEndPoint().getIp(),
+ req.getDestNode().getInternalEndPoint().getIp(),
e);
submitSucceed = false;
}
@@ -237,22 +236,23 @@ public class RegionMigrateService implements IService {
private static class AddRegionPeerTask implements Runnable {
private static final Logger taskLogger =
LoggerFactory.getLogger(AddRegionPeerTask.class);
- // migrate which region
+ // The RegionGroup that shall perform the add peer process
private final TConsensusGroupId tRegionId;
- // migrate to which node
- private final TDataNodeLocation toNode;
+ // The DataNode that selected to perform the add peer process
+ private final TDataNodeLocation selectedDataNode;
- public AddRegionPeerTask(TConsensusGroupId tRegionId, TDataNodeLocation
toNode) {
+ public AddRegionPeerTask(TConsensusGroupId tRegionId, TDataNodeLocation
selectedDataNode) {
this.tRegionId = tRegionId;
- this.toNode = toNode;
+ this.selectedDataNode = selectedDataNode;
}
@Override
public void run() {
TSStatus runResult = addPeer();
if (isFailed(runResult)) {
- reportFailed(tRegionId, toNode,
TRegionMigrateFailedType.AddPeerFailed, runResult);
+ reportFailed(
+ tRegionId, selectedDataNode,
TRegionMigrateFailedType.AddPeerFailed, runResult);
return;
}
@@ -263,7 +263,7 @@ public class RegionMigrateService implements IService {
ConsensusGroupId regionId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(tRegionId);
TSStatus status = new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
ConsensusGenericResponse resp = null;
- TEndPoint newPeerNode = getConsensusEndPoint(toNode, regionId);
+ TEndPoint newPeerNode = getConsensusEndPoint(selectedDataNode, regionId);
taskLogger.info("Start to add peer {} for region {}", newPeerNode,
tRegionId);
boolean addPeerSucceed = true;
for (int i = 0; i < RETRY; i++) {
@@ -334,23 +334,23 @@ public class RegionMigrateService implements IService {
private static class RemoveRegionPeerTask implements Runnable {
private static final Logger taskLogger =
LoggerFactory.getLogger(RemoveRegionPeerTask.class);
- // migrate which region
+ // The RegionGroup that shall perform the add peer process
private final TConsensusGroupId tRegionId;
- // migrate from which node
- private final TDataNodeLocation fromNode;
+ // The DataNode that selected to perform the add peer process
+ private final TDataNodeLocation selectedDataNode;
- public RemoveRegionPeerTask(
- TConsensusGroupId tRegionId, TDataNodeLocation fromNode,
TDataNodeLocation newLeaderNode) {
+ public RemoveRegionPeerTask(TConsensusGroupId tRegionId, TDataNodeLocation
selectedDataNode) {
this.tRegionId = tRegionId;
- this.fromNode = fromNode;
+ this.selectedDataNode = selectedDataNode;
}
@Override
public void run() {
TSStatus runResult = removePeer();
if (isFailed(runResult)) {
- reportFailed(tRegionId, fromNode,
TRegionMigrateFailedType.RemovePeerFailed, runResult);
+ reportFailed(
+ tRegionId, selectedDataNode,
TRegionMigrateFailedType.RemovePeerFailed, runResult);
}
reportSucceed(tRegionId);
@@ -369,7 +369,7 @@ public class RegionMigrateService implements IService {
private TSStatus removePeer() {
ConsensusGroupId regionId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(tRegionId);
TSStatus status = new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- TEndPoint oldPeerNode = getConsensusEndPoint(fromNode, regionId);
+ TEndPoint oldPeerNode = getConsensusEndPoint(selectedDataNode, regionId);
taskLogger.info("start to remove peer {} for region {}", oldPeerNode,
regionId);
ConsensusGenericResponse resp = null;
boolean removePeerSucceed = true;
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index bb026e83cc..3394d5e2d9 100644
---
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -96,7 +96,7 @@ import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidatePermissionCacheReq;
-import org.apache.iotdb.mpp.rpc.thrift.TMigrateRegionReq;
+import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
import org.apache.iotdb.mpp.rpc.thrift.TSchemaFetchRequest;
@@ -639,7 +639,7 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
}
@Override
- public TSStatus createPeerToConsensusGroup(TCreatePeerReq req) throws
TException {
+ public TSStatus createNewRegionPeer(TCreatePeerReq req) throws TException {
ConsensusGroupId regionId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getRegionId());
List<Peer> peers =
@@ -655,39 +655,53 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
}
@Override
- public TSStatus removeRegionPeer(TMigrateRegionReq req) throws TException {
+ public TSStatus addRegionPeer(TMaintainPeerReq req) throws TException {
TConsensusGroupId regionId = req.getRegionId();
- String fromNodeIp = req.getFromNode().getInternalEndPoint().getIp();
+ String selectedDataNodeIP =
req.getDestNode().getInternalEndPoint().getIp();
+ boolean submitSucceed =
RegionMigrateService.getInstance().submitAddRegionPeerTask(req);
+ TSStatus status = new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ if (submitSucceed) {
+ LOGGER.info(
+ "Successfully submit a add region peer task for region: {} on
DataNode: {}",
+ regionId,
+ selectedDataNodeIP);
+ return status;
+ }
+ status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
+ status.setMessage("submit add region peer task failed, region: " +
regionId);
+ return status;
+ }
+
+ @Override
+ public TSStatus removeRegionPeer(TMaintainPeerReq req) throws TException {
+ TConsensusGroupId regionId = req.getRegionId();
+ String selectedDataNodeIP =
req.getDestNode().getInternalEndPoint().getIp();
boolean submitSucceed =
RegionMigrateService.getInstance().submitRemoveRegionPeerTask(req);
TSStatus status = new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
if (submitSucceed) {
LOGGER.info(
- "succeed to submit a remove region peer task. region: {}, from {}",
+ "Successfully to submit a remove region peer task for region: {} on
DataNode: {}",
regionId,
- req.getFromNode().getInternalEndPoint());
+ selectedDataNodeIP);
return status;
}
status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
- status.setMessage(
- "submit region remove region peer task failed, region: "
- + regionId
- + ", from "
- + req.getFromNode().getInternalEndPoint());
+ status.setMessage("submit add region peer task failed, region: " +
regionId);
return status;
}
@Override
- public TSStatus deletePeerToConsensusGroup(TMigrateRegionReq req) throws
TException {
+ public TSStatus deleteOldRegionPeer(TMaintainPeerReq req) throws TException {
TConsensusGroupId regionId = req.getRegionId();
- String fromNodeIp = req.getFromNode().getInternalEndPoint().getIp();
+ String selectedDataNodeIP =
req.getDestNode().getInternalEndPoint().getIp();
boolean submitSucceed =
RegionMigrateService.getInstance().submitRemoveRegionConsensusGroupTask(req);
TSStatus status = new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
if (submitSucceed) {
LOGGER.info(
- "succeed to submit a remove region consensus group task. region: {},
from {}",
+ "Successfully to submit a remove region consensus group task for
region: {} on DataNode: {}",
regionId,
- fromNodeIp);
+ selectedDataNodeIP);
return status;
}
status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
@@ -756,22 +770,6 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
- @Override
- public TSStatus addRegionPeer(TMigrateRegionReq req) throws TException {
- TConsensusGroupId regionId = req.getRegionId();
- String toNodeIp = req.getToNode().getInternalEndPoint().getIp();
- boolean submitSucceed =
RegionMigrateService.getInstance().submitAddRegionPeerTask(req);
- TSStatus status = new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- if (submitSucceed) {
- LOGGER.info(
- "succeed to submit a add region peer task. region: {}, to {}",
regionId, toNodeIp);
- return status;
- }
- status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
- status.setMessage("submit add region peer task failed, region: " +
regionId);
- return status;
- }
-
private TEndPoint getConsensusEndPoint(
TDataNodeLocation nodeLocation, ConsensusGroupId regionId) {
if (regionId instanceof DataRegionId) {
diff --git a/thrift/src/main/thrift/datanode.thrift
b/thrift/src/main/thrift/datanode.thrift
index a84cf62ee4..e8f463852a 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -20,38 +20,36 @@ include "common.thrift"
namespace java org.apache.iotdb.mpp.rpc.thrift
struct TCreateSchemaRegionReq {
- 1: required common.TRegionReplicaSet regionReplicaSet
- 2: required string storageGroup
+ 1: required common.TRegionReplicaSet regionReplicaSet
+ 2: required string storageGroup
}
struct TCreateDataRegionReq {
- 1: required common.TRegionReplicaSet regionReplicaSet
- 2: required string storageGroup
- 3: optional i64 ttl
+ 1: required common.TRegionReplicaSet regionReplicaSet
+ 2: required string storageGroup
+ 3: optional i64 ttl
}
struct TInvalidateCacheReq {
- 1: required bool storageGroup
- 2: required string fullPath
-}
-
-struct TMigrateRegionReq {
- 1: required common.TConsensusGroupId regionId
- 2: required common.TDataNodeLocation fromNode
- 3: required common.TDataNodeLocation toNode
- 4: optional common.TDataNodeLocation newLeaderNode
+ 1: required bool storageGroup
+ 2: required string fullPath
}
struct TRegionLeaderChangeReq {
- 1: required common.TConsensusGroupId regionId
- 2: required common.TDataNodeLocation newLeaderNode
+ 1: required common.TConsensusGroupId regionId
+ 2: required common.TDataNodeLocation newLeaderNode
}
struct TCreatePeerReq {
- 1: required common.TConsensusGroupId regionId
- 2: required list<common.TDataNodeLocation> regionLocations
- 3: required string storageGroup
- 4: optional i64 ttl
+ 1: required common.TConsensusGroupId regionId
+ 2: required list<common.TDataNodeLocation> regionLocations
+ 3: required string storageGroup
+ 4: optional i64 ttl
+}
+
+struct TMaintainPeerReq {
+ 1: required common.TConsensusGroupId regionId
+ 2: required common.TDataNodeLocation destNode
}
struct TFragmentInstanceId {
@@ -210,8 +208,8 @@ struct TUpdateConfigNodeGroupReq {
}
struct TUpdateTemplateReq{
- 1: required byte type
- 2: required binary templateInfo
+ 1: required byte type
+ 2: required binary templateInfo
}
service IDataNodeRPCService {
@@ -284,29 +282,32 @@ service IDataNodeRPCService {
common.TSStatus changeRegionLeader(TRegionLeaderChangeReq req);
/**
- * Create new peer in the given data node for region consensus group
- * @param region id and it's expected locations
+ * Create a new Region peer in the given DataNode for the specified
RegionGroup
+ *
+ * @param TCreatePeerReq which contains RegionId and its colleagues'
locations
*/
- common.TSStatus createPeerToConsensusGroup(TCreatePeerReq req);
+ common.TSStatus createNewRegionPeer(TCreatePeerReq req);
/**
- * Config node will add a region peer to a region group
- * @param add region req which region from one node to other node
+ * Add a Region peer to the specified RegionGroup
+ *
+ * @param TMaintainPeerReq which contains RegionId and the DataNodeLocation
that selected to perform the add peer process
*/
- common.TSStatus addRegionPeer(TMigrateRegionReq req);
+ common.TSStatus addRegionPeer(TMaintainPeerReq req);
/**
- * Config node will remove a region peer to a region group
- * @param remove region peer region from one node to other node
+ * Remove a Region peer from the specified RegionGroup
+ *
+ * @param TMaintainPeerReq which contains RegionId and the DataNodeLocation
that selected to perform the remove peer process
*/
- common.TSStatus removeRegionPeer(TMigrateRegionReq req);
+ common.TSStatus removeRegionPeer(TMaintainPeerReq req);
/**
- * Delete the datanode peer for the given consensus group. Usually a region
group has
- * multiple replicas, thus relates to multiple nodes.
- * @param TMigrateRegionReq which contains the dest datanode to be removed
- */
- common.TSStatus deletePeerToConsensusGroup(TMigrateRegionReq req);
+ * Delete a Region peer in the given ConsensusGroup and all of its data on
the specified DataNode
+ *
+ * @param TMaintainPeerReq which contains RegionId and the DataNodeLocation
where the specified Region peer located
+ */
+ common.TSStatus deleteOldRegionPeer(TMaintainPeerReq req);
/**
* Config node will disable the Data node, the Data node will not accept
read/write request when disabled