This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch mergemaster0808
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 36fbbed12f6b4a7f3963b1202f29886b092d7ff6
Author: Li Yu Heng <[email protected]>
AuthorDate: Fri Aug 2 16:55:30 2024 +0800

    Improve robustness of removing peer step of region migration  (#13078)
    
    * done
    
    * add suggestion
    
    (cherry picked from commit 2c10213db2f8473e014e45cae39cbaaf160e4e86)
---
 .../apache/iotdb/consensus/iot/IoTConsensus.java   | 28 ++++---------
 .../consensus/iot/IoTConsensusServerImpl.java      | 48 ++++++++++++++--------
 .../service/IoTConsensusRPCServiceProcessor.java   |  7 ++--
 3 files changed, 41 insertions(+), 42 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index 464311dd94d..b659db9e3db 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -334,16 +334,9 @@ public class IoTConsensus implements IConsensus {
       KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_DONE);
 
     } catch (ConsensusGroupModifyPeerException e) {
-      try {
-        logger.info("[IoTConsensus] add remote peer failed, automatic cleanup 
side effects...");
-
-        // clean up the sync log channel
-        impl.notifyPeersToRemoveSyncLogChannel(peer);
-
-      } catch (ConsensusGroupModifyPeerException mpe) {
-        logger.error(
-            "[IoTConsensus] failed to cleanup side effects after failed to add 
remote peer", mpe);
-      }
+      logger.info("[IoTConsensus] add remote peer failed, automatic cleanup 
side effects...");
+      // try to clean up the sync log channel
+      impl.notifyPeersToRemoveSyncLogChannel(peer);
       throw new ConsensusException(e);
     } finally {
       impl.checkAndUnlockSafeDeletedSearchIndex();
@@ -364,12 +357,8 @@ public class IoTConsensus implements IConsensus {
 
     KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.INIT);
 
-    try {
-      // let other peers remove the sync channel with target peer
-      impl.notifyPeersToRemoveSyncLogChannel(peer);
-    } catch (ConsensusGroupModifyPeerException e) {
-      throw new ConsensusException(e.getMessage());
-    }
+    // let other peers remove the sync channel with target peer
+    impl.notifyPeersToRemoveSyncLogChannel(peer);
     KillPoint.setKillPoint(
         
IoTConsensusRemovePeerCoordinatorKillPoints.AFTER_NOTIFY_PEERS_TO_REMOVE_SYNC_LOG_CHANNEL);
 
@@ -500,14 +489,11 @@ public class IoTConsensus implements IConsensus {
     String previousPeerListStr = impl.getConfiguration().toString();
     for (Peer peer : impl.getConfiguration()) {
       if (!correctPeers.contains(peer)) {
-        try {
-          impl.removeSyncLogChannel(peer);
-        } catch (ConsensusGroupModifyPeerException e) {
+        if (!impl.removeSyncLogChannel(peer)) {
           logger.error(
               "[RESET PEER LIST] Failed to remove peer {}'s sync log channel 
from group {}",
               peer,
-              groupId,
-              e);
+              groupId);
         }
       }
     }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 300f3725de5..ef35ae62c99 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -546,8 +546,7 @@ public class IoTConsensusServerImpl {
     }
   }
 
-  public void notifyPeersToRemoveSyncLogChannel(Peer targetPeer)
-      throws ConsensusGroupModifyPeerException {
+  public void notifyPeersToRemoveSyncLogChannel(Peer targetPeer) {
     // The configuration will be modified during iterating because we will add 
the targetPeer to
     // configuration
     ImmutableList<Peer> currentMembers = 
ImmutableList.copyOf(this.configuration);
@@ -568,12 +567,14 @@ public class IoTConsensusServerImpl {
                       targetPeer.getEndpoint(),
                       targetPeer.getNodeId()));
           if (!isSuccess(res.status)) {
-            throw new ConsensusGroupModifyPeerException(
-                String.format("remove sync log channel failed from %s to %s", 
peer, targetPeer));
+            logger.warn("removing sync log channel failed from {} to {}", 
peer, targetPeer);
           }
         } catch (Exception e) {
-          throw new ConsensusGroupModifyPeerException(
-              String.format("error when removing sync log channel to %s", 
peer), e);
+          logger.warn(
+              "Exception happened during removing sync log channel from {} to 
{}",
+              peer,
+              targetPeer,
+              e);
         }
       }
     }
@@ -647,20 +648,33 @@ public class IoTConsensusServerImpl {
     logger.info("[IoTConsensus] persist new configuration: {}", configuration);
   }
 
-  public void removeSyncLogChannel(Peer targetPeer) throws 
ConsensusGroupModifyPeerException {
+  /**
+   * @return totally succeed
+   */
+  public boolean removeSyncLogChannel(Peer targetPeer) {
+    // step 1, remove sync channel in LogDispatcher
+    boolean exceptionHappened = false;
+    String suggestion = "";
     try {
-      // step 1, remove sync channel in LogDispatcher
       logDispatcher.removeLogDispatcherThread(targetPeer);
-      logger.info("[IoTConsensus] log dispatcher to {} removed and cleanup", 
targetPeer);
-      // step 2, update configuration
-      configuration.remove(targetPeer);
-      checkAndUpdateSafeDeletedSearchIndex();
-      // step 3, persist configuration
-      persistConfiguration();
-      logger.info("[IoTConsensus] configuration updated to {}", 
this.configuration);
-    } catch (IOException e) {
-      throw new ConsensusGroupModifyPeerException("error when remove 
LogDispatcherThread", e);
+    } catch (Exception e) {
+      logger.warn(
+          "[IoTConsensus] Exception happened during removing log dispatcher 
thread, but configuration.dat will still be removed.",
+          e);
+      suggestion = "It's suggested restart the DataNode to remove log 
dispatcher thread.";
+      exceptionHappened = true;
+    }
+    if (!exceptionHappened) {
+      logger.info(
+          "[IoTConsensus] Log dispatcher thread to {} has been removed and 
cleanup", targetPeer);
     }
+    // step 2, update configuration
+    configuration.remove(targetPeer);
+    checkAndUpdateSafeDeletedSearchIndex();
+    // step 3, persist configuration
+    persistConfiguration();
+    logger.info("[IoTConsensus] Configuration updated to {}. {}", 
this.configuration, suggestion);
+    return !exceptionHappened;
   }
 
   public void persistConfiguration() {
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
index a609acd2651..6201ec06ef6 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
@@ -208,12 +208,11 @@ public class IoTConsensusRPCServiceProcessor implements 
IoTConsensusIService.Ifa
       return new TRemoveSyncLogChannelRes(status);
     }
     TSStatus responseStatus;
-    try {
-      impl.removeSyncLogChannel(new Peer(groupId, req.nodeId, req.endPoint));
+    if (impl.removeSyncLogChannel(new Peer(groupId, req.nodeId, 
req.endPoint))) {
       responseStatus = new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-    } catch (ConsensusGroupModifyPeerException e) {
+    } else {
       responseStatus = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
-      responseStatus.setMessage(e.getMessage());
+      responseStatus.setMessage("remove sync log channel failed");
     }
     return new TRemoveSyncLogChannelRes(responseStatus);
   }

Reply via email to