This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 61dea8bb005 [to dev/1.3][region migration] Remove retry of 
RegionMigrateService::addPeer  (#14409)
61dea8bb005 is described below

commit 61dea8bb005b96ba8623ffb7a9727c649788aa43
Author: Li Yu Heng <[email protected]>
AuthorDate: Fri Dec 13 12:42:51 2024 +0800

    [to dev/1.3][region migration] Remove retry of 
RegionMigrateService::addPeer  (#14409)
    
    * [region migration] Remove retry of RegionMigrateService::addPeer (#14362)
    
    * done
    
    * synchronized & snapshot name
    
    * remove test
    
    * for removePeer and resetPeerList
    
    * name
---
 .../apache/iotdb/consensus/iot/IoTConsensus.java   | 157 +++++++++++----------
 .../consensus/iot/IoTConsensusServerImpl.java      |  20 +--
 .../apache/iotdb/consensus/iot/StabilityTest.java  |  36 -----
 .../iotdb/db/service/RegionMigrateService.java     |  49 +++----
 4 files changed, 103 insertions(+), 159 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index 64e27ec7102..98f25d0716f 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -296,53 +296,55 @@ public class IoTConsensus implements IConsensus {
     IoTConsensusServerImpl impl =
         Optional.ofNullable(stateMachineMap.get(groupId))
             .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
-    if (impl.getConfiguration().contains(peer)) {
-      throw new PeerAlreadyInConsensusGroupException(groupId, peer);
-    }
-    try {
-      // step 1: inactive new Peer to prepare for following steps
-      logger.info("[IoTConsensus] inactivate new peer: {}", peer);
-      impl.inactivePeer(peer, false);
+    synchronized (impl) {
+      if (impl.getConfiguration().contains(peer)) {
+        throw new PeerAlreadyInConsensusGroupException(groupId, peer);
+      }
+      try {
+        // step 1: inactive new Peer to prepare for following steps
+        logger.info("[IoTConsensus] inactivate new peer: {}", peer);
+        impl.inactivePeer(peer, false);
 
-      // step 2: notify all the other Peers to build the sync connection to 
newPeer
-      logger.info("[IoTConsensus] notify current peers to build sync log...");
-      impl.notifyPeersToBuildSyncLogChannel(peer);
+        // step 2: notify all the other Peers to build the sync connection to 
newPeer
+        logger.info("[IoTConsensus] notify current peers to build sync 
log...");
+        impl.notifyPeersToBuildSyncLogChannel(peer);
 
-      // step 3: take snapshot
-      logger.info("[IoTConsensus] start to take snapshot...");
+        // step 3: take snapshot
+        logger.info("[IoTConsensus] start to take snapshot...");
 
-      impl.takeSnapshot();
+        impl.takeSnapshot();
 
-      // step 4: transit snapshot
-      logger.info("[IoTConsensus] start to transmit snapshot...");
-      impl.transmitSnapshot(peer);
+        // step 4: transit snapshot
+        logger.info("[IoTConsensus] start to transmit snapshot...");
+        impl.transmitSnapshot(peer);
 
-      // step 5: let the new peer load snapshot
-      logger.info("[IoTConsensus] trigger new peer to load snapshot...");
-      impl.triggerSnapshotLoad(peer);
-      
KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_TRANSITION);
+        // step 5: let the new peer load snapshot
+        logger.info("[IoTConsensus] trigger new peer to load snapshot...");
+        impl.triggerSnapshotLoad(peer);
+        
KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_TRANSITION);
 
-      // step 6: active new Peer
-      logger.info("[IoTConsensus] activate new peer...");
-      impl.activePeer(peer);
+        // step 6: active new Peer
+        logger.info("[IoTConsensus] activate new peer...");
+        impl.activePeer(peer);
+
+        // step 7: notify remote peer to clean up transferred snapshot
+        logger.info("[IoTConsensus] clean up remote snapshot...");
+        try {
+          impl.cleanupRemoteSnapshot(peer);
+        } catch (ConsensusGroupModifyPeerException e) {
+          logger.warn("[IoTConsensus] failed to cleanup remote snapshot", e);
+        }
+        KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_DONE);
 
-      // step 7: notify remote peer to clean up transferred snapshot
-      logger.info("[IoTConsensus] clean up remote snapshot...");
-      try {
-        impl.cleanupRemoteSnapshot(peer);
       } catch (ConsensusGroupModifyPeerException e) {
-        logger.warn("[IoTConsensus] failed to cleanup remote snapshot", e);
+        logger.info("[IoTConsensus] add remote peer failed, automatic cleanup 
side effects...");
+        // try to clean up the sync log channel
+        impl.notifyPeersToRemoveSyncLogChannel(peer);
+        throw new ConsensusException(e);
+      } finally {
+        logger.info("[IoTConsensus] clean up local snapshot...");
+        impl.cleanupLocalSnapshot();
       }
-      KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_DONE);
-
-    } catch (ConsensusGroupModifyPeerException e) {
-      logger.info("[IoTConsensus] add remote peer failed, automatic cleanup 
side effects...");
-      // try to clean up the sync log channel
-      impl.notifyPeersToRemoveSyncLogChannel(peer);
-      throw new ConsensusException(e);
-    } finally {
-      logger.info("[IoTConsensus] clean up local snapshot...");
-      impl.cleanupLocalSnapshot();
     }
   }
 
@@ -352,29 +354,32 @@ public class IoTConsensus implements IConsensus {
         Optional.ofNullable(stateMachineMap.get(groupId))
             .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
 
-    if (!impl.getConfiguration().contains(peer)) {
-      throw new PeerNotInConsensusGroupException(groupId, peer.toString());
-    }
+    synchronized (impl) {
+      if (!impl.getConfiguration().contains(peer)) {
+        throw new PeerNotInConsensusGroupException(groupId, peer.toString());
+      }
 
-    KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.INIT);
+      KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.INIT);
 
-    // let other peers remove the sync channel with target peer
-    impl.notifyPeersToRemoveSyncLogChannel(peer);
-    KillPoint.setKillPoint(
-        
IoTConsensusRemovePeerCoordinatorKillPoints.AFTER_NOTIFY_PEERS_TO_REMOVE_SYNC_LOG_CHANNEL);
+      // let other peers remove the sync channel with target peer
+      impl.notifyPeersToRemoveSyncLogChannel(peer);
+      KillPoint.setKillPoint(
+          IoTConsensusRemovePeerCoordinatorKillPoints
+              .AFTER_NOTIFY_PEERS_TO_REMOVE_SYNC_LOG_CHANNEL);
 
-    try {
-      // let target peer reject new write
-      impl.inactivePeer(peer, true);
-      
KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.AFTER_INACTIVE_PEER);
-      // wait its SyncLog to complete
-      impl.waitTargetPeerUntilSyncLogCompleted(peer);
-      // wait its region related resource to release
-      impl.waitReleaseAllRegionRelatedResource(peer);
-    } catch (ConsensusGroupModifyPeerException e) {
-      throw new ConsensusException(e.getMessage());
+      try {
+        // let target peer reject new write
+        impl.inactivePeer(peer, true);
+        
KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.AFTER_INACTIVE_PEER);
+        // wait its SyncLog to complete
+        impl.waitTargetPeerUntilSyncLogCompleted(peer);
+        // wait its region related resource to release
+        impl.waitReleaseAllRegionRelatedResource(peer);
+      } catch (ConsensusGroupModifyPeerException e) {
+        throw new ConsensusException(e.getMessage());
+      }
+      
KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.FINISH);
     }
-    KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.FINISH);
   }
 
   @Override
@@ -485,6 +490,7 @@ public class IoTConsensus implements IConsensus {
     IoTConsensusServerImpl impl =
         Optional.ofNullable(stateMachineMap.get(groupId))
             .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
+
     Peer localPeer = new Peer(groupId, thisNodeId, thisNode);
     if (!correctPeers.contains(localPeer)) {
       logger.info(
@@ -493,25 +499,28 @@ public class IoTConsensus implements IConsensus {
       deleteLocalPeer(groupId);
       return;
     }
-    ImmutableList<Peer> currentMembers = 
ImmutableList.copyOf(impl.getConfiguration());
-    String previousPeerListStr = currentMembers.toString();
-    for (Peer peer : currentMembers) {
-      if (!correctPeers.contains(peer)) {
-        if (!impl.removeSyncLogChannel(peer)) {
-          logger.error(
-              "[RESET PEER LIST] Failed to remove peer {}'s sync log channel 
from group {}",
-              peer,
-              groupId);
+
+    synchronized (impl) {
+      ImmutableList<Peer> currentMembers = 
ImmutableList.copyOf(impl.getConfiguration());
+      String previousPeerListStr = currentMembers.toString();
+      for (Peer peer : currentMembers) {
+        if (!correctPeers.contains(peer)) {
+          if (!impl.removeSyncLogChannel(peer)) {
+            logger.error(
+                "[RESET PEER LIST] Failed to remove peer {}'s sync log channel 
from group {}",
+                peer,
+                groupId);
+          }
         }
       }
-    }
-    logger.info(
-        "[RESET PEER LIST] Local peer list has been reset: {} -> {}",
-        previousPeerListStr,
-        impl.getConfiguration());
-    for (Peer peer : correctPeers) {
-      if (!impl.getConfiguration().contains(peer)) {
-        logger.warn("[RESET PEER LIST] \"Correct peer\" {} is not in local 
peer list", peer);
+      logger.info(
+          "[RESET PEER LIST] Local peer list has been reset: {} -> {}",
+          previousPeerListStr,
+          impl.getConfiguration());
+      for (Peer peer : correctPeers) {
+        if (!impl.getConfiguration().contains(peer)) {
+          logger.warn("[RESET PEER LIST] \"Correct peer\" {} is not in local 
peer list", peer);
+        }
       }
     }
   }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 57256b39b6a..8ea522aea27 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -91,6 +91,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.PriorityQueue;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -276,10 +277,9 @@ public class IoTConsensusServerImpl {
 
   public void takeSnapshot() throws ConsensusGroupModifyPeerException {
     try {
-      long newSnapshotIndex = getLatestSnapshotIndex() + 1;
       newSnapshotDirName =
           String.format(
-              "%s_%s_%d", SNAPSHOT_DIR_NAME, thisNode.getGroupId().getId(), 
newSnapshotIndex);
+              "%s_%s_%s", SNAPSHOT_DIR_NAME, thisNode.getGroupId().getId(), 
UUID.randomUUID());
       File snapshotDir = new File(storageDir, newSnapshotDirName);
       if (snapshotDir.exists()) {
         FileUtils.deleteDirectory(snapshotDir);
@@ -400,22 +400,6 @@ public class IoTConsensusServerImpl {
     return originalFilePath.substring(originalFilePath.indexOf(snapshotId));
   }
 
-  private long getLatestSnapshotIndex() {
-    long snapShotIndex = 0;
-    File directory = new File(storageDir);
-    File[] versionFiles = directory.listFiles((dir, name) -> 
name.startsWith(SNAPSHOT_DIR_NAME));
-    if (versionFiles == null || versionFiles.length == 0) {
-      return snapShotIndex;
-    }
-    for (File file : versionFiles) {
-      snapShotIndex =
-          Math.max(
-              snapShotIndex,
-              
Long.parseLong(SNAPSHOT_INDEX_PATTEN.matcher(file.getName()).replaceAll("")));
-    }
-    return snapShotIndex;
-  }
-
   private void clearOldSnapshot() {
     File directory = new File(storageDir);
     File[] versionFiles = directory.listFiles((dir, name) -> 
name.startsWith(SNAPSHOT_DIR_NAME));
diff --git 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java
 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java
index 43e328e3833..d7675084680 100644
--- 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java
+++ 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.config.ConsensusConfig;
 import org.apache.iotdb.consensus.exception.ConsensusException;
 import 
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
-import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException;
 import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
 import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
 import org.apache.iotdb.consensus.exception.IllegalPeerNumException;
@@ -95,7 +94,6 @@ public class StabilityTest {
     peerTest();
     transferLeader();
     snapshotTest();
-    snapshotUpgradeTest();
   }
 
   public void addConsensusGroup() {
@@ -212,38 +210,4 @@ public class StabilityTest {
     Assert.assertNotEquals(versionFiles1[0].getName(), 
versionFiles2[0].getName());
     consensusImpl.deleteLocalPeer(dataRegionId);
   }
-
-  public void snapshotUpgradeTest() throws Exception {
-    consensusImpl.createLocalPeer(
-        dataRegionId,
-        Collections.singletonList(new Peer(dataRegionId, 1, new 
TEndPoint("0.0.0.0", basePort))));
-    consensusImpl.triggerSnapshot(dataRegionId, false);
-    long oldSnapshotIndex = System.currentTimeMillis();
-    String oldSnapshotDirName =
-        String.format(
-            "%s_%s_%d",
-            IoTConsensusServerImpl.SNAPSHOT_DIR_NAME, dataRegionId.getId(), 
oldSnapshotIndex);
-    File regionDir = new File(storageDir, "1_1");
-    File oldSnapshotDir = new File(regionDir, oldSnapshotDirName);
-    if (oldSnapshotDir.exists()) {
-      FileUtils.deleteFully(oldSnapshotDir);
-    }
-    if (!oldSnapshotDir.mkdirs()) {
-      throw new ConsensusGroupModifyPeerException(
-          String.format("%s: cannot mkdir for snapshot", dataRegionId));
-    }
-    consensusImpl.triggerSnapshot(dataRegionId, false);
-    Assert.assertFalse(oldSnapshotDir.exists());
-
-    File dataDir = new File(IoTConsensus.buildPeerDir(storageDir, 
dataRegionId));
-
-    File[] snapshotFiles =
-        dataDir.listFiles((dir, name) -> 
name.startsWith(IoTConsensusServerImpl.SNAPSHOT_DIR_NAME));
-    Assert.assertNotNull(snapshotFiles);
-    Assert.assertEquals(1, snapshotFiles.length);
-    Assert.assertEquals(
-        oldSnapshotIndex + 1,
-        
Long.parseLong(snapshotFiles[0].getName().replaceAll(".*[^\\d](?=(\\d+))", 
"")));
-    consensusImpl.deleteLocalPeer(dataRegionId);
-  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
index c6551c0d54d..056df802c13 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
@@ -256,42 +256,29 @@ public class RegionMigrateService implements IService {
       TEndPoint destEndpoint = getConsensusEndPoint(destDataNode, regionId);
       boolean addPeerSucceed = true;
       Throwable throwable = null;
-      for (int i = 0; i < MAX_RETRY_NUM; i++) {
-        try {
-          if (!addPeerSucceed) {
-            Thread.sleep(SLEEP_MILLIS);
-          }
-          addRegionPeer(regionId, new Peer(regionId, 
destDataNode.getDataNodeId(), destEndpoint));
-          addPeerSucceed = true;
-        } catch (PeerAlreadyInConsensusGroupException e) {
-          addPeerSucceed = true;
-        } catch (InterruptedException e) {
-          throwable = e;
-          Thread.currentThread().interrupt();
-        } catch (ConsensusException e) {
-          addPeerSucceed = false;
-          throwable = e;
-          taskLogger.error(
-              "{}, executed addPeer {} for region {} error, retry times: {}",
-              REGION_MIGRATE_PROCESS,
-              destEndpoint,
-              regionId,
-              i,
-              e);
-        } catch (Exception e) {
-          addPeerSucceed = false;
-          throwable = e;
-          taskLogger.warn("Unexpected exception", e);
-        }
-        if (addPeerSucceed || throwable instanceof InterruptedException) {
-          break;
-        }
+      try {
+        addRegionPeer(regionId, new Peer(regionId, 
destDataNode.getDataNodeId(), destEndpoint));
+      } catch (PeerAlreadyInConsensusGroupException ignore) {
+
+      } catch (ConsensusException e) {
+        addPeerSucceed = false;
+        throwable = e;
+        taskLogger.error(
+            "{}, executed addPeer {} for region {} error",
+            REGION_MIGRATE_PROCESS,
+            destEndpoint,
+            regionId,
+            e);
+      } catch (Exception e) {
+        addPeerSucceed = false;
+        throwable = e;
+        taskLogger.warn("Unexpected exception", e);
       }
 
       if (!addPeerSucceed) {
         String errorMsg =
             String.format(
-                "%s, AddPeer for region error after max retry times, peerId: 
%s, regionId: %s",
+                "%s, AddPeer for region error, peerId: %s, regionId: %s",
                 REGION_MIGRATE_PROCESS, destEndpoint, regionId);
         taskLogger.error(errorMsg, throwable);
         status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());

Reply via email to