This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 2c10213db2f Improve robustness of removing peer step of region
migration (#13078)
2c10213db2f is described below
commit 2c10213db2f8473e014e45cae39cbaaf160e4e86
Author: Li Yu Heng <[email protected]>
AuthorDate: Fri Aug 2 16:55:30 2024 +0800
Improve robustness of removing peer step of region migration (#13078)
* done
* add suggestion
---
.../apache/iotdb/consensus/iot/IoTConsensus.java | 28 ++++---------
.../consensus/iot/IoTConsensusServerImpl.java | 48 ++++++++++++++--------
.../service/IoTConsensusRPCServiceProcessor.java | 7 ++--
3 files changed, 41 insertions(+), 42 deletions(-)
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index 464311dd94d..b659db9e3db 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -334,16 +334,9 @@ public class IoTConsensus implements IConsensus {
KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_DONE);
} catch (ConsensusGroupModifyPeerException e) {
- try {
- logger.info("[IoTConsensus] add remote peer failed, automatic cleanup
side effects...");
-
- // clean up the sync log channel
- impl.notifyPeersToRemoveSyncLogChannel(peer);
-
- } catch (ConsensusGroupModifyPeerException mpe) {
- logger.error(
- "[IoTConsensus] failed to cleanup side effects after failed to add
remote peer", mpe);
- }
+ logger.info("[IoTConsensus] add remote peer failed, automatic cleanup
side effects...");
+ // try to clean up the sync log channel
+ impl.notifyPeersToRemoveSyncLogChannel(peer);
throw new ConsensusException(e);
} finally {
impl.checkAndUnlockSafeDeletedSearchIndex();
@@ -364,12 +357,8 @@ public class IoTConsensus implements IConsensus {
KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.INIT);
- try {
- // let other peers remove the sync channel with target peer
- impl.notifyPeersToRemoveSyncLogChannel(peer);
- } catch (ConsensusGroupModifyPeerException e) {
- throw new ConsensusException(e.getMessage());
- }
+ // let other peers remove the sync channel with target peer
+ impl.notifyPeersToRemoveSyncLogChannel(peer);
KillPoint.setKillPoint(
IoTConsensusRemovePeerCoordinatorKillPoints.AFTER_NOTIFY_PEERS_TO_REMOVE_SYNC_LOG_CHANNEL);
@@ -500,14 +489,11 @@ public class IoTConsensus implements IConsensus {
String previousPeerListStr = impl.getConfiguration().toString();
for (Peer peer : impl.getConfiguration()) {
if (!correctPeers.contains(peer)) {
- try {
- impl.removeSyncLogChannel(peer);
- } catch (ConsensusGroupModifyPeerException e) {
+ if (!impl.removeSyncLogChannel(peer)) {
logger.error(
"[RESET PEER LIST] Failed to remove peer {}'s sync log channel
from group {}",
peer,
- groupId,
- e);
+ groupId);
}
}
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 300f3725de5..ef35ae62c99 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -546,8 +546,7 @@ public class IoTConsensusServerImpl {
}
}
- public void notifyPeersToRemoveSyncLogChannel(Peer targetPeer)
- throws ConsensusGroupModifyPeerException {
+ public void notifyPeersToRemoveSyncLogChannel(Peer targetPeer) {
// The configuration will be modified during iterating because we will add
the targetPeer to
// configuration
ImmutableList<Peer> currentMembers =
ImmutableList.copyOf(this.configuration);
@@ -568,12 +567,14 @@ public class IoTConsensusServerImpl {
targetPeer.getEndpoint(),
targetPeer.getNodeId()));
if (!isSuccess(res.status)) {
- throw new ConsensusGroupModifyPeerException(
- String.format("remove sync log channel failed from %s to %s",
peer, targetPeer));
+ logger.warn("removing sync log channel failed from {} to {}",
peer, targetPeer);
}
} catch (Exception e) {
- throw new ConsensusGroupModifyPeerException(
- String.format("error when removing sync log channel to %s",
peer), e);
+ logger.warn(
+ "Exception happened during removing sync log channel from {} to
{}",
+ peer,
+ targetPeer,
+ e);
}
}
}
@@ -647,20 +648,33 @@ public class IoTConsensusServerImpl {
logger.info("[IoTConsensus] persist new configuration: {}", configuration);
}
- public void removeSyncLogChannel(Peer targetPeer) throws
ConsensusGroupModifyPeerException {
+ /**
+ * @return totally succeed
+ */
+ public boolean removeSyncLogChannel(Peer targetPeer) {
+ // step 1, remove sync channel in LogDispatcher
+ boolean exceptionHappened = false;
+ String suggestion = "";
try {
- // step 1, remove sync channel in LogDispatcher
logDispatcher.removeLogDispatcherThread(targetPeer);
- logger.info("[IoTConsensus] log dispatcher to {} removed and cleanup",
targetPeer);
- // step 2, update configuration
- configuration.remove(targetPeer);
- checkAndUpdateSafeDeletedSearchIndex();
- // step 3, persist configuration
- persistConfiguration();
- logger.info("[IoTConsensus] configuration updated to {}",
this.configuration);
- } catch (IOException e) {
- throw new ConsensusGroupModifyPeerException("error when remove
LogDispatcherThread", e);
+ } catch (Exception e) {
+ logger.warn(
+ "[IoTConsensus] Exception happened during removing log dispatcher
thread, but configuration.dat will still be removed.",
+ e);
+ suggestion = "It's suggested restart the DataNode to remove log
dispatcher thread.";
+ exceptionHappened = true;
+ }
+ if (!exceptionHappened) {
+ logger.info(
+ "[IoTConsensus] Log dispatcher thread to {} has been removed and
cleanup", targetPeer);
}
+ // step 2, update configuration
+ configuration.remove(targetPeer);
+ checkAndUpdateSafeDeletedSearchIndex();
+ // step 3, persist configuration
+ persistConfiguration();
+ logger.info("[IoTConsensus] Configuration updated to {}. {}",
this.configuration, suggestion);
+ return !exceptionHappened;
}
public void persistConfiguration() {
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
index a609acd2651..6201ec06ef6 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
@@ -208,12 +208,11 @@ public class IoTConsensusRPCServiceProcessor implements
IoTConsensusIService.Ifa
return new TRemoveSyncLogChannelRes(status);
}
TSStatus responseStatus;
- try {
- impl.removeSyncLogChannel(new Peer(groupId, req.nodeId, req.endPoint));
+ if (impl.removeSyncLogChannel(new Peer(groupId, req.nodeId,
req.endPoint))) {
responseStatus = new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- } catch (ConsensusGroupModifyPeerException e) {
+ } else {
responseStatus = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
- responseStatus.setMessage(e.getMessage());
+ responseStatus.setMessage("remove sync log channel failed");
}
return new TRemoveSyncLogChannelRes(responseStatus);
}