This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b7e9d6ec3d5 Some region migration related work (#12376)
b7e9d6ec3d5 is described below

commit b7e9d6ec3d505e9e0730b8d27d330813ff1b5ce2
Author: Li Yu Heng <[email protected]>
AuthorDate: Tue Apr 23 18:27:29 2024 +0800

    Some region migration related work (#12376)
---
 .../iotdb/confignode/manager/ProcedureManager.java |  47 ++++----
 .../iotdb/confignode/manager/load/LoadManager.java |   4 +
 .../manager/load/balancer/RouteBalancer.java       |   4 +-
 .../partition/DatabasePartitionTable.java          |  11 +-
 .../persistence/partition/PartitionInfo.java       |  11 +-
 .../procedure/env/RegionMaintainHandler.java       | 118 ++++++++++-----------
 .../impl/region/AddRegionPeerProcedure.java        |  28 +++--
 .../impl/region/RemoveRegionPeerProcedure.java     |   2 +-
 .../consensus/iot/IoTConsensusServerImpl.java      |  28 +++--
 .../iotdb/db/service/RegionMigrateService.java     |   6 ++
 .../org/apache/iotdb/commons/utils/FileUtils.java  |  18 ++++
 11 files changed, 157 insertions(+), 120 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 820b59c3810..86c74e3e98d 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -52,7 +52,6 @@ import 
org.apache.iotdb.confignode.procedure.ProcedureExecutor;
 import org.apache.iotdb.confignode.procedure.ProcedureMetrics;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.env.RegionMaintainHandler;
-import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.impl.cq.CreateCQProcedure;
 import org.apache.iotdb.confignode.procedure.impl.node.AddConfigNodeProcedure;
 import 
org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure;
@@ -546,27 +545,6 @@ public class ProcedureManager {
   }
 
   // region region migration
-
-  private TConsensusGroupId regionIdToTConsensusGroupId(final int regionId)
-      throws ProcedureException {
-    if (configManager
-        .getPartitionManager()
-        .isRegionGroupExists(new 
TConsensusGroupId(TConsensusGroupType.SchemaRegion, regionId))) {
-      return new TConsensusGroupId(TConsensusGroupType.SchemaRegion, regionId);
-    }
-    if (configManager
-        .getPartitionManager()
-        .isRegionGroupExists(new 
TConsensusGroupId(TConsensusGroupType.DataRegion, regionId))) {
-      return new TConsensusGroupId(TConsensusGroupType.DataRegion, regionId);
-    }
-    String msg =
-        String.format(
-            "Submit RegionMigrateProcedure failed, because RegionGroup: %s 
doesn't exist",
-            regionId);
-    LOGGER.warn(msg);
-    throw new ProcedureException(msg);
-  }
-
   private TSStatus checkRegionMigrate(
       TMigrateRegionReq migrateRegionReq,
       TConsensusGroupId regionGroupId,
@@ -574,7 +552,28 @@ public class ProcedureManager {
       TDataNodeLocation destDataNode,
       TDataNodeLocation coordinatorForAddPeer) {
     String failMessage = null;
-    if (originalDataNode == null) {
+    Optional<Procedure<ConfigNodeProcedureEnv>> anotherMigrateProcedure =
+        this.executor.getProcedures().values().stream()
+            .filter(
+                procedure -> {
+                  if (procedure instanceof RegionMigrateProcedure) {
+                    return !procedure.isFinished()
+                        && ((RegionMigrateProcedure) procedure)
+                            .getConsensusGroupId()
+                            .equals(regionGroupId);
+                  }
+                  return false;
+                })
+            .findAny();
+    if (anotherMigrateProcedure.isPresent()) {
+      failMessage =
+          String.format(
+              "Submit RegionMigrateProcedure failed, "
+                  + "because another RegionMigrateProcedure of the same 
consensus group %d is already in processing. "
+                  + "A consensus group is able to have at most 1 
RegionMigrateProcedure at the same time"
+                  + "For further information, you can search [pid%d] in log.",
+              regionGroupId, anotherMigrateProcedure.get().getProcId());
+    } else if (originalDataNode == null) {
       failMessage =
           String.format(
               "Submit RegionMigrateProcedure failed, because no original 
DataNode %d",
@@ -626,7 +625,7 @@ public class ProcedureManager {
     return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
   }
 
-  public TSStatus migrateRegion(TMigrateRegionReq migrateRegionReq) {
+  public synchronized TSStatus migrateRegion(TMigrateRegionReq 
migrateRegionReq) {
     TConsensusGroupId regionGroupId;
     Optional<TConsensusGroupId> optional =
         configManager
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 1d53d101b38..b115aee68aa 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -437,4 +437,8 @@ public class LoadManager {
   public LoadCache getLoadCache() {
     return loadCache;
   }
+
+  public RouteBalancer getRouteBalancer() {
+    return routeBalancer;
+  }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index 40e210c7f6f..30ca728e7a9 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -139,7 +139,7 @@ public class RouteBalancer implements 
IClusterStatusSubscriber {
   }
 
   /** Balance cluster RegionGroup leader distribution through configured 
algorithm. */
-  private synchronized void balanceRegionLeader() {
+  public synchronized void balanceRegionLeader() {
     if (IS_ENABLE_AUTO_LEADER_BALANCE_FOR_SCHEMA_REGION) {
       balanceRegionLeader(TConsensusGroupType.SchemaRegion, 
SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS);
     }
@@ -241,7 +241,7 @@ public class RouteBalancer implements 
IClusterStatusSubscriber {
   }
 
   /** Balance cluster RegionGroup route priority through configured algorithm. 
*/
-  private synchronized void balanceRegionPriority() {
+  public synchronized void balanceRegionPriority() {
     priorityMapLock.writeLock().lock();
     AtomicBoolean needBroadcast = new AtomicBoolean(false);
     Map<TConsensusGroupId, Pair<TRegionReplicaSet, TRegionReplicaSet>> 
differentPriorityMap =
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
index 2683e281412..8ad66b126d8 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
@@ -56,6 +56,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 public class DatabasePartitionTable {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(DatabasePartitionTable.class);
@@ -113,14 +114,8 @@ public class DatabasePartitionTable {
   }
 
   /** @return Deep copy of all Regions' RegionReplicaSet within one 
StorageGroup */
-  public List<TRegionReplicaSet> getAllReplicaSets() {
-    List<TRegionReplicaSet> result = new ArrayList<>();
-
-    for (RegionGroup regionGroup : regionGroupMap.values()) {
-      result.add(regionGroup.getReplicaSet());
-    }
-
-    return result;
+  public Stream<TRegionReplicaSet> getAllReplicaSets() {
+    return regionGroupMap.values().stream().map(RegionGroup::getReplicaSet);
   }
 
   /**
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index 2e4ac434b2b..9fdf66873c4 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -669,12 +669,9 @@ public class PartitionInfo implements SnapshotProcessor {
    * @return Deep copy of all Regions' RegionReplicaSet
    */
   public List<TRegionReplicaSet> getAllReplicaSets() {
-    List<TRegionReplicaSet> result = new ArrayList<>();
-    databasePartitionTables
-        .values()
-        .forEach(
-            databasePartitionTable -> 
result.addAll(databasePartitionTable.getAllReplicaSets()));
-    return result;
+    return databasePartitionTables.values().stream()
+        .flatMap(DatabasePartitionTable::getAllReplicaSets)
+        .collect(Collectors.toList());
   }
 
   /**
@@ -704,7 +701,7 @@ public class PartitionInfo implements SnapshotProcessor {
    */
   public List<TRegionReplicaSet> getAllReplicaSets(String database) {
     if (databasePartitionTables.containsKey(database)) {
-      return databasePartitionTables.get(database).getAllReplicaSets();
+      return 
databasePartitionTables.get(database).getAllReplicaSets().collect(Collectors.toList());
     } else {
       return Collections.emptyList();
     }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
index 5f44c1fdae4..326787449df 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
@@ -44,10 +44,12 @@ import 
org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNo
 import 
org.apache.iotdb.confignode.consensus.request.write.partition.AddRegionLocationPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.partition.RemoveRegionLocationPlan;
 import 
org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp;
+import org.apache.iotdb.confignode.exception.DatabaseNotExistsException;
 import org.apache.iotdb.confignode.manager.ConfigManager;
 import 
org.apache.iotdb.confignode.manager.load.cache.consensus.ConsensusGroupHeartbeatSample;
 import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
 import org.apache.iotdb.confignode.persistence.node.NodeInfo;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
 import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.mpp.rpc.thrift.TCreatePeerReq;
@@ -73,6 +75,7 @@ import java.util.stream.Collectors;
 import static 
org.apache.iotdb.confignode.conf.ConfigNodeConstant.REGION_MIGRATE_PROCESS;
 import static 
org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS;
 import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS;
+import static org.apache.iotdb.consensus.ConsensusFactory.RATIS_CONSENSUS;
 import static org.apache.iotdb.consensus.ConsensusFactory.SIMPLE_CONSENSUS;
 
 public class RegionMaintainHandler {
@@ -219,8 +222,15 @@ public class RegionMaintainHandler {
 
     String storageGroup = 
configManager.getPartitionManager().getRegionStorageGroup(regionId);
     TCreatePeerReq req = new TCreatePeerReq(regionId, currentPeerNodes, 
storageGroup);
-    // TODO replace with real ttl
-    req.setTtl(Long.MAX_VALUE);
+    long ttl = Long.MAX_VALUE;
+    try {
+      ttl = 
configManager.getClusterSchemaManager().getDatabaseSchemaByName(storageGroup).getTTL();
+    } catch (DatabaseNotExistsException e) {
+      LOGGER.warn(
+          "Cannot find out the database which region {} belongs to, ttl will 
be set to Long.MAX_VALUE",
+          regionId);
+    }
+    req.setTtl(ttl);
 
     status =
         SyncDataNodeClientPool.getInstance()
@@ -229,12 +239,13 @@ public class RegionMaintainHandler {
                 req,
                 DataNodeRequestType.CREATE_NEW_REGION_PEER);
 
-    LOGGER.info(
-        "{}, Send action createNewRegionPeer finished, regionId: {}, 
newPeerDataNodeId: {}",
-        REGION_MIGRATE_PROCESS,
-        regionId,
-        getIdWithRpcEndpoint(destDataNode));
-    if (isFailed(status)) {
+    if (isSucceed(status)) {
+      LOGGER.info(
+          "{}, Send action createNewRegionPeer finished, regionId: {}, 
newPeerDataNodeId: {}",
+          REGION_MIGRATE_PROCESS,
+          regionId,
+          getIdWithRpcEndpoint(destDataNode));
+    } else {
       LOGGER.error(
           "{}, Send action createNewRegionPeer error, regionId: {}, 
newPeerDataNodeId: {}, result: {}",
           REGION_MIGRATE_PROCESS,
@@ -242,6 +253,7 @@ public class RegionMaintainHandler {
           getIdWithRpcEndpoint(destDataNode),
           status);
     }
+
     return status;
   }
 
@@ -354,19 +366,6 @@ public class RegionMaintainHandler {
     return status;
   }
 
-  public TSStatus resetPeerList(
-      TConsensusGroupId regionId,
-      List<TDataNodeLocation> correctDataNodeLocations,
-      TDataNodeLocation target) {
-    TSStatus status =
-        SyncDataNodeClientPool.getInstance()
-            .sendSyncRequestToDataNodeWithRetry(
-                target.getInternalEndPoint(),
-                new TResetPeerListReq(regionId, correctDataNodeLocations),
-                DataNodeRequestType.RESET_PEER_LIST);
-    return status;
-  }
-
   public Map<Integer, TSStatus> resetPeerList(
       TConsensusGroupId regionId,
       List<TDataNodeLocation> correctDataNodeLocations,
@@ -661,50 +660,49 @@ public class RegionMaintainHandler {
    * @param regionId The region to be migrated
    * @param originalDataNode The DataNode where the region locates
    */
-  public void changeRegionLeader(TConsensusGroupId regionId, TDataNodeLocation 
originalDataNode) {
+  public void transferRegionLeader(TConsensusGroupId regionId, 
TDataNodeLocation originalDataNode)
+      throws ProcedureException {
     Optional<TDataNodeLocation> newLeaderNode =
         filterDataNodeWithOtherRegionReplica(regionId, originalDataNode);
-
-    if (TConsensusGroupType.DataRegion.equals(regionId.getType())
-        && IOT_CONSENSUS.equals(CONF.getDataRegionConsensusProtocolClass())) {
-      if (newLeaderNode.isPresent()) {
-        configManager
-            .getLoadManager()
-            .forceUpdateConsensusGroupCache(
-                Collections.singletonMap(
-                    regionId,
-                    new ConsensusGroupHeartbeatSample(
-                        System.nanoTime(), 
newLeaderNode.get().getDataNodeId())));
-        LOGGER.info(
-            "{}, Change region leader finished for IOT_CONSENSUS, regionId: 
{}, newLeaderNode: {}",
-            REGION_MIGRATE_PROCESS,
-            regionId,
-            newLeaderNode);
+    newLeaderNode.orElseThrow(() -> new ProcedureException("Cannot find the 
new leader"));
+
+    // ratis needs DataNode to do election by itself
+    long timestamp = System.nanoTime();
+    if (TConsensusGroupType.SchemaRegion.equals(regionId.getType())
+        || TConsensusGroupType.DataRegion.equals(regionId.getType())
+            && 
RATIS_CONSENSUS.equals(CONF.getDataRegionConsensusProtocolClass())) {
+      final int MAX_RETRY_TIME = 10;
+      int retryTime = 0;
+      while (true) {
+        TRegionLeaderChangeResp resp =
+            SyncDataNodeClientPool.getInstance()
+                .changeRegionLeader(
+                    regionId, originalDataNode.getInternalEndPoint(), 
newLeaderNode.get());
+        if (resp.getStatus().getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          timestamp = resp.getConsensusLogicalTimestamp();
+          break;
+        }
+        if (retryTime++ > MAX_RETRY_TIME) {
+          throw new ProcedureException("Transfer leader fail");
+        }
+        LOGGER.warn("Call changeRegionLeader fail for the {} time", retryTime);
       }
-
-      return;
     }
 
-    if (newLeaderNode.isPresent()) {
-      TRegionLeaderChangeResp resp =
-          SyncDataNodeClientPool.getInstance()
-              .changeRegionLeader(
-                  regionId, originalDataNode.getInternalEndPoint(), 
newLeaderNode.get());
-      if (resp.getStatus().getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        configManager
-            .getLoadManager()
-            .forceUpdateConsensusGroupCache(
-                Collections.singletonMap(
-                    regionId,
-                    new ConsensusGroupHeartbeatSample(
-                        resp.getConsensusLogicalTimestamp(), 
newLeaderNode.get().getDataNodeId())));
-      }
-      LOGGER.info(
-          "{}, Change region leader finished for RATIS_CONSENSUS, regionId: 
{}, newLeaderNode: {}",
-          REGION_MIGRATE_PROCESS,
-          regionId,
-          newLeaderNode);
-    }
+    configManager
+        .getLoadManager()
+        .forceUpdateConsensusGroupCache(
+            Collections.singletonMap(
+                regionId,
+                new ConsensusGroupHeartbeatSample(timestamp, 
newLeaderNode.get().getDataNodeId())));
+    configManager.getLoadManager().getRouteBalancer().balanceRegionLeader();
+    configManager.getLoadManager().getRouteBalancer().balanceRegionPriority();
+
+    LOGGER.info(
+        "{}, Change region leader finished, regionId: {}, newLeaderNode: {}",
+        REGION_MIGRATE_PROCESS,
+        regionId,
+        newLeaderNode);
   }
 
   /**
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
index 24d629c48ba..bebf03f6792 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
@@ -44,7 +44,6 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -175,9 +174,15 @@ public class AddRegionPeerProcedure
             .orElseThrow(
                 () ->
                     new ProcedureException(
-                        "Cannot roll back, because cannot find the correct 
locations"))
+                        "[pid{}][AddRegion] Cannot roll back, because cannot 
find the correct locations"))
             .getDataNodeLocations();
-
+    if (correctDataNodeLocations.remove(destDataNode)) {
+      LOGGER.warn(
+          "[pid{}][AddRegion] It appears that consensus write has not modified 
the local partition table. "
+              + "Please verify whether a leader change has occurred during 
this stage. "
+              + "If this log is triggered without a leader change, it 
indicates a potential bug in the partition table.",
+          getProcId());
+    }
     String correctStr =
         correctDataNodeLocations.stream()
             .map(TDataNodeLocation::getDataNodeId)
@@ -185,16 +190,19 @@ public class AddRegionPeerProcedure
             .toString();
     List<TDataNodeLocation> relatedDataNodeLocations = new 
ArrayList<>(correctDataNodeLocations);
     relatedDataNodeLocations.add(destDataNode);
-    Map<Integer, TDataNodeLocation> relatedDataNodeLocationMap = new 
HashMap<>();
-    relatedDataNodeLocations.forEach(
-        location -> relatedDataNodeLocationMap.put(location.dataNodeId, 
location));
+    Map<Integer, TDataNodeLocation> relatedDataNodeLocationMap =
+        relatedDataNodeLocations.stream()
+            .collect(
+                Collectors.toMap(
+                    TDataNodeLocation::getDataNodeId, dataNodeLocation -> 
dataNodeLocation));
     LOGGER.info(
-        "[pid{}][AddRegion] Will reset peer list of consensus group {} on 
DataNode {}",
+        "[pid{}][AddRegion] reset peer list: peer list of consensus group {} 
on DataNode {} will be reset to {}",
         getProcId(),
         consensusGroupId,
-        relatedDataNodeLocations.stream()
+        relatedDataNodeLocationMap.values().stream()
             .map(TDataNodeLocation::getDataNodeId)
-            .collect(Collectors.toList()));
+            .collect(Collectors.toList()),
+        correctStr);
 
     Map<Integer, TSStatus> resultMap =
         handler.resetPeerList(
@@ -204,7 +212,7 @@ public class AddRegionPeerProcedure
         (dataNodeId, resetResult) -> {
           if (resetResult.getCode() == SUCCESS_STATUS.getStatusCode()) {
             LOGGER.info(
-                "[pid{}][AddRegion] reset peer list: peer list of consensus 
group {} on DataNode {} has been successfully to {}",
+                "[pid{}][AddRegion] reset peer list: peer list of consensus 
group {} on DataNode {} has been successfully reset to {}",
                 getProcId(),
                 consensusGroupId,
                 dataNodeId,
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
index e459161ca90..17fdc903295 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
@@ -90,7 +90,7 @@ public class RemoveRegionPeerProcedure
               consensusGroupId.getId(),
               targetDataNode.getDataNodeId());
           handler.updateRegionCache(consensusGroupId, targetDataNode, 
RegionStatus.Removing);
-          handler.changeRegionLeader(consensusGroupId, targetDataNode);
+          handler.transferRegionLeader(consensusGroupId, targetDataNode);
           KillPoint.setKillPoint(state);
           setNextState(REMOVE_REGION_PEER);
           break;
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index e2d02b75338..e886ad51b4c 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -98,6 +98,8 @@ import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static 
org.apache.iotdb.commons.utils.FileUtils.humanReadableByteCountSI;
+
 public class IoTConsensusServerImpl {
 
   private static final String CONFIGURATION_FILE_NAME = "configuration.dat";
@@ -295,13 +297,20 @@ public class IoTConsensusServerImpl {
     File snapshotDir = new File(storageDir, newSnapshotDirName);
     List<Path> snapshotPaths = stateMachine.getSnapshotFiles(snapshotDir);
     AtomicLong snapshotSizeSumAtomic = new AtomicLong();
+    StringBuilder allFilesStr = new StringBuilder();
     snapshotPaths.forEach(
-        snapshotPath -> {
+        path -> {
           try {
-            snapshotSizeSumAtomic.addAndGet(Files.size(snapshotPath));
+            long fileSize = Files.size(path);
+            snapshotSizeSumAtomic.addAndGet(fileSize);
+            allFilesStr
+                .append("\n")
+                .append(path)
+                .append(" ")
+                .append(humanReadableByteCountSI(fileSize));
           } catch (IOException e) {
             logger.error(
-                "[SNAPSHOT TRANSMISSION] Calculate snapshot file's size fail: 
{}", snapshotPath, e);
+                "[SNAPSHOT TRANSMISSION] Calculate snapshot file's size fail: 
{}", path, e);
           }
         });
     final long snapshotSizeSum = snapshotSizeSumAtomic.get();
@@ -311,8 +320,10 @@ public class IoTConsensusServerImpl {
     logger.info(
         "[SNAPSHOT TRANSMISSION] Start to transmit snapshots ({} files, total 
size {}) from dir {}",
         snapshotPaths.size(),
-        FileUtils.byteCountToDisplaySize(snapshotSizeSum),
+        humanReadableByteCountSI(snapshotSizeSum),
         snapshotDir);
+    logger.info(
+        "[SNAPSHOT TRANSMISSION] All the files below shell be transmitted: 
{}", allFilesStr);
     try (SyncIoTConsensusServiceClient client =
         syncClientManager.borrowClient(targetPeer.getEndpoint())) {
       for (Path path : snapshotPaths) {
@@ -334,14 +345,15 @@ public class IoTConsensusServerImpl {
           transitedSnapshotSizeSum += reader.getTotalReadSize();
           transitedFilesNum++;
           logger.info(
-              "[SNAPSHOT TRANSMISSION] The overall progress for dir {}: files 
{}/{} done, size {}/{} done, time {} passed",
+              "[SNAPSHOT TRANSMISSION] The overall progress for dir {}: files 
{}/{} done, size {}/{} done, time {} passed. File {} done.",
               newSnapshotDirName,
               transitedFilesNum,
               snapshotPaths.size(),
-              FileUtils.byteCountToDisplaySize(transitedSnapshotSizeSum),
-              FileUtils.byteCountToDisplaySize(snapshotSizeSum),
+              humanReadableByteCountSI(transitedSnapshotSizeSum),
+              humanReadableByteCountSI(snapshotSizeSum),
               CommonDateTimeUtils.convertMillisecondToDurationStr(
-                  (System.nanoTime() - startTime) / 1_000_000));
+                  (System.nanoTime() - startTime) / 1_000_000),
+              path);
         } finally {
           reader.close();
         }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
index 11a47d7dc20..5fea6c0a008 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.commons.service.IService;
 import org.apache.iotdb.commons.service.ServiceType;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
 import 
org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
 import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
 import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
@@ -180,6 +181,11 @@ public class RegionMigrateService implements IService {
       } else {
         SchemaRegionConsensusImpl.getInstance().resetPeerList(regionId, 
correctPeers);
       }
+    } catch (ConsensusGroupNotExistException e) {
+      LOGGER.warn(
+          "Reset peer list fail, this DataNode not contains peer of consensus 
group {}. Maybe caused by create local peer failure.",
+          regionId,
+          e);
     } catch (ConsensusException e) {
       LOGGER.error("reset peer list fail", e);
       return new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
index f2788381b4a..ee2f5098642 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
@@ -35,6 +35,8 @@ import java.nio.file.FileSystems;
 import java.nio.file.Files;
 import java.nio.file.NoSuchFileException;
 import java.nio.file.Path;
+import java.text.CharacterIterator;
+import java.text.StringCharacterIterator;
 import java.util.Arrays;
 import java.util.Objects;
 
@@ -286,4 +288,20 @@ public class FileUtils {
     Files.copy(sourceFile.toPath(), targetFile.toPath());
     return targetFile;
   }
+
+  /**
+   * Transfer bytes to human-readable string. Copy from <a
+   * href="https://stackoverflow.com/a/3758880";>stackoverflow</a>.
+   */
+  public static String humanReadableByteCountSI(long bytes) {
+    if (-1000 < bytes && bytes < 1000) {
+      return bytes + " B";
+    }
+    CharacterIterator ci = new StringCharacterIterator("KMGTPE");
+    while (bytes <= -999_950 || bytes >= 999_950) {
+      bytes /= 1000;
+      ci.next();
+    }
+    return String.format("%.2f %cB", bytes / 1000.0, ci.current());
+  }
 }

Reply via email to