This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch mergemaster0808 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 36fbbed12f6b4a7f3963b1202f29886b092d7ff6 Author: Li Yu Heng <[email protected]> AuthorDate: Fri Aug 2 16:55:30 2024 +0800 Improve robustness of removing peer step of region migration (#13078) * done * add suggestion (cherry picked from commit 2c10213db2f8473e014e45cae39cbaaf160e4e86) --- .../apache/iotdb/consensus/iot/IoTConsensus.java | 28 ++++--------- .../consensus/iot/IoTConsensusServerImpl.java | 48 ++++++++++++++-------- .../service/IoTConsensusRPCServiceProcessor.java | 7 ++-- 3 files changed, 41 insertions(+), 42 deletions(-) diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java index 464311dd94d..b659db9e3db 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java @@ -334,16 +334,9 @@ public class IoTConsensus implements IConsensus { KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_DONE); } catch (ConsensusGroupModifyPeerException e) { - try { - logger.info("[IoTConsensus] add remote peer failed, automatic cleanup side effects..."); - - // clean up the sync log channel - impl.notifyPeersToRemoveSyncLogChannel(peer); - - } catch (ConsensusGroupModifyPeerException mpe) { - logger.error( - "[IoTConsensus] failed to cleanup side effects after failed to add remote peer", mpe); - } + logger.info("[IoTConsensus] add remote peer failed, automatic cleanup side effects..."); + // try to clean up the sync log channel + impl.notifyPeersToRemoveSyncLogChannel(peer); throw new ConsensusException(e); } finally { impl.checkAndUnlockSafeDeletedSearchIndex(); @@ -364,12 +357,8 @@ public class IoTConsensus implements IConsensus { KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.INIT); - try { - // let other peers remove the sync channel with target peer - impl.notifyPeersToRemoveSyncLogChannel(peer); - } catch (ConsensusGroupModifyPeerException e) { - throw new ConsensusException(e.getMessage()); - } + // let other peers remove the sync channel with target peer + impl.notifyPeersToRemoveSyncLogChannel(peer); KillPoint.setKillPoint( IoTConsensusRemovePeerCoordinatorKillPoints.AFTER_NOTIFY_PEERS_TO_REMOVE_SYNC_LOG_CHANNEL); @@ -500,14 +489,11 @@ public class IoTConsensus implements IConsensus { String previousPeerListStr = impl.getConfiguration().toString(); for (Peer peer : impl.getConfiguration()) { if (!correctPeers.contains(peer)) { - try { - impl.removeSyncLogChannel(peer); - } catch (ConsensusGroupModifyPeerException e) { + if (!impl.removeSyncLogChannel(peer)) { logger.error( "[RESET PEER LIST] Failed to remove peer {}'s sync log channel from group {}", peer, - groupId, - e); + groupId); } } } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java index 300f3725de5..ef35ae62c99 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java @@ -546,8 +546,7 @@ public class IoTConsensusServerImpl { } } - public void notifyPeersToRemoveSyncLogChannel(Peer targetPeer) - throws ConsensusGroupModifyPeerException { + public void notifyPeersToRemoveSyncLogChannel(Peer targetPeer) { // The configuration will be modified during iterating because we will add the targetPeer to // configuration ImmutableList<Peer> currentMembers = ImmutableList.copyOf(this.configuration); @@ -568,12 +567,14 @@ public class IoTConsensusServerImpl { targetPeer.getEndpoint(), targetPeer.getNodeId())); if (!isSuccess(res.status)) { - throw new ConsensusGroupModifyPeerException( - String.format("remove sync log channel failed from %s to %s", peer, targetPeer)); + logger.warn("removing sync log channel failed from {} to {}", peer, targetPeer); } } catch (Exception e) { - throw new ConsensusGroupModifyPeerException( - String.format("error when removing sync log channel to %s", peer), e); + logger.warn( + "Exception happened during removing sync log channel from {} to {}", + peer, + targetPeer, + e); } } } @@ -647,20 +648,33 @@ public class IoTConsensusServerImpl { logger.info("[IoTConsensus] persist new configuration: {}", configuration); } - public void removeSyncLogChannel(Peer targetPeer) throws ConsensusGroupModifyPeerException { + /** + * @return totally succeed + */ + public boolean removeSyncLogChannel(Peer targetPeer) { + // step 1, remove sync channel in LogDispatcher + boolean exceptionHappened = false; + String suggestion = ""; try { - // step 1, remove sync channel in LogDispatcher logDispatcher.removeLogDispatcherThread(targetPeer); - logger.info("[IoTConsensus] log dispatcher to {} removed and cleanup", targetPeer); - // step 2, update configuration - configuration.remove(targetPeer); - checkAndUpdateSafeDeletedSearchIndex(); - // step 3, persist configuration - persistConfiguration(); - logger.info("[IoTConsensus] configuration updated to {}", this.configuration); - } catch (IOException e) { - throw new ConsensusGroupModifyPeerException("error when remove LogDispatcherThread", e); + } catch (Exception e) { + logger.warn( + "[IoTConsensus] Exception happened during removing log dispatcher thread, but configuration.dat will still be removed.", + e); + suggestion = "It's suggested restart the DataNode to remove log dispatcher thread."; + exceptionHappened = true; + } + if (!exceptionHappened) { + logger.info( + "[IoTConsensus] Log dispatcher thread to {} has been removed and cleanup", targetPeer); } + // step 2, update configuration + configuration.remove(targetPeer); + checkAndUpdateSafeDeletedSearchIndex(); + // step 3, persist configuration + persistConfiguration(); + logger.info("[IoTConsensus] Configuration updated to {}. {}", this.configuration, suggestion); + return !exceptionHappened; } public void persistConfiguration() { diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java index a609acd2651..6201ec06ef6 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java @@ -208,12 +208,11 @@ public class IoTConsensusRPCServiceProcessor implements IoTConsensusIService.Ifa return new TRemoveSyncLogChannelRes(status); } TSStatus responseStatus; - try { - impl.removeSyncLogChannel(new Peer(groupId, req.nodeId, req.endPoint)); + if (impl.removeSyncLogChannel(new Peer(groupId, req.nodeId, req.endPoint))) { responseStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); - } catch (ConsensusGroupModifyPeerException e) { + } else { responseStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); - responseStatus.setMessage(e.getMessage()); + responseStatus.setMessage("remove sync log channel failed"); } return new TRemoveSyncLogChannelRes(responseStatus); }
