This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b7e9d6ec3d5 Some region migration related work (#12376)
b7e9d6ec3d5 is described below
commit b7e9d6ec3d505e9e0730b8d27d330813ff1b5ce2
Author: Li Yu Heng <[email protected]>
AuthorDate: Tue Apr 23 18:27:29 2024 +0800
Some region migration related work (#12376)
---
.../iotdb/confignode/manager/ProcedureManager.java | 47 ++++----
.../iotdb/confignode/manager/load/LoadManager.java | 4 +
.../manager/load/balancer/RouteBalancer.java | 4 +-
.../partition/DatabasePartitionTable.java | 11 +-
.../persistence/partition/PartitionInfo.java | 11 +-
.../procedure/env/RegionMaintainHandler.java | 118 ++++++++++-----------
.../impl/region/AddRegionPeerProcedure.java | 28 +++--
.../impl/region/RemoveRegionPeerProcedure.java | 2 +-
.../consensus/iot/IoTConsensusServerImpl.java | 28 +++--
.../iotdb/db/service/RegionMigrateService.java | 6 ++
.../org/apache/iotdb/commons/utils/FileUtils.java | 18 ++++
11 files changed, 157 insertions(+), 120 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 820b59c3810..86c74e3e98d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -52,7 +52,6 @@ import
org.apache.iotdb.confignode.procedure.ProcedureExecutor;
import org.apache.iotdb.confignode.procedure.ProcedureMetrics;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.env.RegionMaintainHandler;
-import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.impl.cq.CreateCQProcedure;
import org.apache.iotdb.confignode.procedure.impl.node.AddConfigNodeProcedure;
import
org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure;
@@ -546,27 +545,6 @@ public class ProcedureManager {
}
// region region migration
-
- private TConsensusGroupId regionIdToTConsensusGroupId(final int regionId)
- throws ProcedureException {
- if (configManager
- .getPartitionManager()
- .isRegionGroupExists(new
TConsensusGroupId(TConsensusGroupType.SchemaRegion, regionId))) {
- return new TConsensusGroupId(TConsensusGroupType.SchemaRegion, regionId);
- }
- if (configManager
- .getPartitionManager()
- .isRegionGroupExists(new
TConsensusGroupId(TConsensusGroupType.DataRegion, regionId))) {
- return new TConsensusGroupId(TConsensusGroupType.DataRegion, regionId);
- }
- String msg =
- String.format(
- "Submit RegionMigrateProcedure failed, because RegionGroup: %s
doesn't exist",
- regionId);
- LOGGER.warn(msg);
- throw new ProcedureException(msg);
- }
-
private TSStatus checkRegionMigrate(
TMigrateRegionReq migrateRegionReq,
TConsensusGroupId regionGroupId,
@@ -574,7 +552,28 @@ public class ProcedureManager {
TDataNodeLocation destDataNode,
TDataNodeLocation coordinatorForAddPeer) {
String failMessage = null;
- if (originalDataNode == null) {
+ Optional<Procedure<ConfigNodeProcedureEnv>> anotherMigrateProcedure =
+ this.executor.getProcedures().values().stream()
+ .filter(
+ procedure -> {
+ if (procedure instanceof RegionMigrateProcedure) {
+ return !procedure.isFinished()
+ && ((RegionMigrateProcedure) procedure)
+ .getConsensusGroupId()
+ .equals(regionGroupId);
+ }
+ return false;
+ })
+ .findAny();
+ if (anotherMigrateProcedure.isPresent()) {
+ failMessage =
+ String.format(
+ "Submit RegionMigrateProcedure failed, "
+ + "because another RegionMigrateProcedure of the same
consensus group %d is already in processing. "
+ + "A consensus group is able to have at most 1
RegionMigrateProcedure at the same time"
+ + "For further information, you can search [pid%d] in log.",
+ regionGroupId, anotherMigrateProcedure.get().getProcId());
+ } else if (originalDataNode == null) {
failMessage =
String.format(
"Submit RegionMigrateProcedure failed, because no original
DataNode %d",
@@ -626,7 +625,7 @@ public class ProcedureManager {
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
- public TSStatus migrateRegion(TMigrateRegionReq migrateRegionReq) {
+ public synchronized TSStatus migrateRegion(TMigrateRegionReq
migrateRegionReq) {
TConsensusGroupId regionGroupId;
Optional<TConsensusGroupId> optional =
configManager
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 1d53d101b38..b115aee68aa 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -437,4 +437,8 @@ public class LoadManager {
public LoadCache getLoadCache() {
return loadCache;
}
+
+ public RouteBalancer getRouteBalancer() {
+ return routeBalancer;
+ }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index 40e210c7f6f..30ca728e7a9 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -139,7 +139,7 @@ public class RouteBalancer implements
IClusterStatusSubscriber {
}
/** Balance cluster RegionGroup leader distribution through configured
algorithm. */
- private synchronized void balanceRegionLeader() {
+ public synchronized void balanceRegionLeader() {
if (IS_ENABLE_AUTO_LEADER_BALANCE_FOR_SCHEMA_REGION) {
balanceRegionLeader(TConsensusGroupType.SchemaRegion,
SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS);
}
@@ -241,7 +241,7 @@ public class RouteBalancer implements
IClusterStatusSubscriber {
}
/** Balance cluster RegionGroup route priority through configured algorithm.
*/
- private synchronized void balanceRegionPriority() {
+ public synchronized void balanceRegionPriority() {
priorityMapLock.writeLock().lock();
AtomicBoolean needBroadcast = new AtomicBoolean(false);
Map<TConsensusGroupId, Pair<TRegionReplicaSet, TRegionReplicaSet>>
differentPriorityMap =
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
index 2683e281412..8ad66b126d8 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
@@ -56,6 +56,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
public class DatabasePartitionTable {
private static final Logger LOGGER =
LoggerFactory.getLogger(DatabasePartitionTable.class);
@@ -113,14 +114,8 @@ public class DatabasePartitionTable {
}
/** @return Deep copy of all Regions' RegionReplicaSet within one
StorageGroup */
- public List<TRegionReplicaSet> getAllReplicaSets() {
- List<TRegionReplicaSet> result = new ArrayList<>();
-
- for (RegionGroup regionGroup : regionGroupMap.values()) {
- result.add(regionGroup.getReplicaSet());
- }
-
- return result;
+ public Stream<TRegionReplicaSet> getAllReplicaSets() {
+ return regionGroupMap.values().stream().map(RegionGroup::getReplicaSet);
}
/**
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index 2e4ac434b2b..9fdf66873c4 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -669,12 +669,9 @@ public class PartitionInfo implements SnapshotProcessor {
* @return Deep copy of all Regions' RegionReplicaSet
*/
public List<TRegionReplicaSet> getAllReplicaSets() {
- List<TRegionReplicaSet> result = new ArrayList<>();
- databasePartitionTables
- .values()
- .forEach(
- databasePartitionTable ->
result.addAll(databasePartitionTable.getAllReplicaSets()));
- return result;
+ return databasePartitionTables.values().stream()
+ .flatMap(DatabasePartitionTable::getAllReplicaSets)
+ .collect(Collectors.toList());
}
/**
@@ -704,7 +701,7 @@ public class PartitionInfo implements SnapshotProcessor {
*/
public List<TRegionReplicaSet> getAllReplicaSets(String database) {
if (databasePartitionTables.containsKey(database)) {
- return databasePartitionTables.get(database).getAllReplicaSets();
+ return
databasePartitionTables.get(database).getAllReplicaSets().collect(Collectors.toList());
} else {
return Collections.emptyList();
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
index 5f44c1fdae4..326787449df 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
@@ -44,10 +44,12 @@ import
org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNo
import
org.apache.iotdb.confignode.consensus.request.write.partition.AddRegionLocationPlan;
import
org.apache.iotdb.confignode.consensus.request.write.partition.RemoveRegionLocationPlan;
import
org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp;
+import org.apache.iotdb.confignode.exception.DatabaseNotExistsException;
import org.apache.iotdb.confignode.manager.ConfigManager;
import
org.apache.iotdb.confignode.manager.load.cache.consensus.ConsensusGroupHeartbeatSample;
import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
import org.apache.iotdb.confignode.persistence.node.NodeInfo;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.mpp.rpc.thrift.TCreatePeerReq;
@@ -73,6 +75,7 @@ import java.util.stream.Collectors;
import static
org.apache.iotdb.confignode.conf.ConfigNodeConstant.REGION_MIGRATE_PROCESS;
import static
org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS;
import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS;
+import static org.apache.iotdb.consensus.ConsensusFactory.RATIS_CONSENSUS;
import static org.apache.iotdb.consensus.ConsensusFactory.SIMPLE_CONSENSUS;
public class RegionMaintainHandler {
@@ -219,8 +222,15 @@ public class RegionMaintainHandler {
String storageGroup =
configManager.getPartitionManager().getRegionStorageGroup(regionId);
TCreatePeerReq req = new TCreatePeerReq(regionId, currentPeerNodes,
storageGroup);
- // TODO replace with real ttl
- req.setTtl(Long.MAX_VALUE);
+ long ttl = Long.MAX_VALUE;
+ try {
+ ttl =
configManager.getClusterSchemaManager().getDatabaseSchemaByName(storageGroup).getTTL();
+ } catch (DatabaseNotExistsException e) {
+ LOGGER.warn(
+ "Cannot find out the database which region {} belongs to, ttl will
be set to Long.MAX_VALUE",
+ regionId);
+ }
+ req.setTtl(ttl);
status =
SyncDataNodeClientPool.getInstance()
@@ -229,12 +239,13 @@ public class RegionMaintainHandler {
req,
DataNodeRequestType.CREATE_NEW_REGION_PEER);
- LOGGER.info(
- "{}, Send action createNewRegionPeer finished, regionId: {},
newPeerDataNodeId: {}",
- REGION_MIGRATE_PROCESS,
- regionId,
- getIdWithRpcEndpoint(destDataNode));
- if (isFailed(status)) {
+ if (isSucceed(status)) {
+ LOGGER.info(
+ "{}, Send action createNewRegionPeer finished, regionId: {},
newPeerDataNodeId: {}",
+ REGION_MIGRATE_PROCESS,
+ regionId,
+ getIdWithRpcEndpoint(destDataNode));
+ } else {
LOGGER.error(
"{}, Send action createNewRegionPeer error, regionId: {},
newPeerDataNodeId: {}, result: {}",
REGION_MIGRATE_PROCESS,
@@ -242,6 +253,7 @@ public class RegionMaintainHandler {
getIdWithRpcEndpoint(destDataNode),
status);
}
+
return status;
}
@@ -354,19 +366,6 @@ public class RegionMaintainHandler {
return status;
}
- public TSStatus resetPeerList(
- TConsensusGroupId regionId,
- List<TDataNodeLocation> correctDataNodeLocations,
- TDataNodeLocation target) {
- TSStatus status =
- SyncDataNodeClientPool.getInstance()
- .sendSyncRequestToDataNodeWithRetry(
- target.getInternalEndPoint(),
- new TResetPeerListReq(regionId, correctDataNodeLocations),
- DataNodeRequestType.RESET_PEER_LIST);
- return status;
- }
-
public Map<Integer, TSStatus> resetPeerList(
TConsensusGroupId regionId,
List<TDataNodeLocation> correctDataNodeLocations,
@@ -661,50 +660,49 @@ public class RegionMaintainHandler {
* @param regionId The region to be migrated
* @param originalDataNode The DataNode where the region locates
*/
- public void changeRegionLeader(TConsensusGroupId regionId, TDataNodeLocation
originalDataNode) {
+ public void transferRegionLeader(TConsensusGroupId regionId,
TDataNodeLocation originalDataNode)
+ throws ProcedureException {
Optional<TDataNodeLocation> newLeaderNode =
filterDataNodeWithOtherRegionReplica(regionId, originalDataNode);
-
- if (TConsensusGroupType.DataRegion.equals(regionId.getType())
- && IOT_CONSENSUS.equals(CONF.getDataRegionConsensusProtocolClass())) {
- if (newLeaderNode.isPresent()) {
- configManager
- .getLoadManager()
- .forceUpdateConsensusGroupCache(
- Collections.singletonMap(
- regionId,
- new ConsensusGroupHeartbeatSample(
- System.nanoTime(),
newLeaderNode.get().getDataNodeId())));
- LOGGER.info(
- "{}, Change region leader finished for IOT_CONSENSUS, regionId:
{}, newLeaderNode: {}",
- REGION_MIGRATE_PROCESS,
- regionId,
- newLeaderNode);
+ newLeaderNode.orElseThrow(() -> new ProcedureException("Cannot find the
new leader"));
+
+ // ratis needs DataNode to do election by itself
+ long timestamp = System.nanoTime();
+ if (TConsensusGroupType.SchemaRegion.equals(regionId.getType())
+ || TConsensusGroupType.DataRegion.equals(regionId.getType())
+ &&
RATIS_CONSENSUS.equals(CONF.getDataRegionConsensusProtocolClass())) {
+ final int MAX_RETRY_TIME = 10;
+ int retryTime = 0;
+ while (true) {
+ TRegionLeaderChangeResp resp =
+ SyncDataNodeClientPool.getInstance()
+ .changeRegionLeader(
+ regionId, originalDataNode.getInternalEndPoint(),
newLeaderNode.get());
+ if (resp.getStatus().getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ timestamp = resp.getConsensusLogicalTimestamp();
+ break;
+ }
+ if (retryTime++ > MAX_RETRY_TIME) {
+ throw new ProcedureException("Transfer leader fail");
+ }
+ LOGGER.warn("Call changeRegionLeader fail for the {} time", retryTime);
}
-
- return;
}
- if (newLeaderNode.isPresent()) {
- TRegionLeaderChangeResp resp =
- SyncDataNodeClientPool.getInstance()
- .changeRegionLeader(
- regionId, originalDataNode.getInternalEndPoint(),
newLeaderNode.get());
- if (resp.getStatus().getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- configManager
- .getLoadManager()
- .forceUpdateConsensusGroupCache(
- Collections.singletonMap(
- regionId,
- new ConsensusGroupHeartbeatSample(
- resp.getConsensusLogicalTimestamp(),
newLeaderNode.get().getDataNodeId())));
- }
- LOGGER.info(
- "{}, Change region leader finished for RATIS_CONSENSUS, regionId:
{}, newLeaderNode: {}",
- REGION_MIGRATE_PROCESS,
- regionId,
- newLeaderNode);
- }
+ configManager
+ .getLoadManager()
+ .forceUpdateConsensusGroupCache(
+ Collections.singletonMap(
+ regionId,
+ new ConsensusGroupHeartbeatSample(timestamp,
newLeaderNode.get().getDataNodeId())));
+ configManager.getLoadManager().getRouteBalancer().balanceRegionLeader();
+ configManager.getLoadManager().getRouteBalancer().balanceRegionPriority();
+
+ LOGGER.info(
+ "{}, Change region leader finished, regionId: {}, newLeaderNode: {}",
+ REGION_MIGRATE_PROCESS,
+ regionId,
+ newLeaderNode);
}
/**
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
index 24d629c48ba..bebf03f6792 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
@@ -44,7 +44,6 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -175,9 +174,15 @@ public class AddRegionPeerProcedure
.orElseThrow(
() ->
new ProcedureException(
- "Cannot roll back, because cannot find the correct
locations"))
+ "[pid{}][AddRegion] Cannot roll back, because cannot
find the correct locations"))
.getDataNodeLocations();
-
+ if (correctDataNodeLocations.remove(destDataNode)) {
+ LOGGER.warn(
+ "[pid{}][AddRegion] It appears that consensus write has not modified
the local partition table. "
+ + "Please verify whether a leader change has occurred during
this stage. "
+ + "If this log is triggered without a leader change, it
indicates a potential bug in the partition table.",
+ getProcId());
+ }
String correctStr =
correctDataNodeLocations.stream()
.map(TDataNodeLocation::getDataNodeId)
@@ -185,16 +190,19 @@ public class AddRegionPeerProcedure
.toString();
List<TDataNodeLocation> relatedDataNodeLocations = new
ArrayList<>(correctDataNodeLocations);
relatedDataNodeLocations.add(destDataNode);
- Map<Integer, TDataNodeLocation> relatedDataNodeLocationMap = new
HashMap<>();
- relatedDataNodeLocations.forEach(
- location -> relatedDataNodeLocationMap.put(location.dataNodeId,
location));
+ Map<Integer, TDataNodeLocation> relatedDataNodeLocationMap =
+ relatedDataNodeLocations.stream()
+ .collect(
+ Collectors.toMap(
+ TDataNodeLocation::getDataNodeId, dataNodeLocation ->
dataNodeLocation));
LOGGER.info(
- "[pid{}][AddRegion] Will reset peer list of consensus group {} on
DataNode {}",
+ "[pid{}][AddRegion] reset peer list: peer list of consensus group {}
on DataNode {} will be reset to {}",
getProcId(),
consensusGroupId,
- relatedDataNodeLocations.stream()
+ relatedDataNodeLocationMap.values().stream()
.map(TDataNodeLocation::getDataNodeId)
- .collect(Collectors.toList()));
+ .collect(Collectors.toList()),
+ correctStr);
Map<Integer, TSStatus> resultMap =
handler.resetPeerList(
@@ -204,7 +212,7 @@ public class AddRegionPeerProcedure
(dataNodeId, resetResult) -> {
if (resetResult.getCode() == SUCCESS_STATUS.getStatusCode()) {
LOGGER.info(
- "[pid{}][AddRegion] reset peer list: peer list of consensus
group {} on DataNode {} has been successfully to {}",
+ "[pid{}][AddRegion] reset peer list: peer list of consensus
group {} on DataNode {} has been successfully reset to {}",
getProcId(),
consensusGroupId,
dataNodeId,
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
index e459161ca90..17fdc903295 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
@@ -90,7 +90,7 @@ public class RemoveRegionPeerProcedure
consensusGroupId.getId(),
targetDataNode.getDataNodeId());
handler.updateRegionCache(consensusGroupId, targetDataNode,
RegionStatus.Removing);
- handler.changeRegionLeader(consensusGroupId, targetDataNode);
+ handler.transferRegionLeader(consensusGroupId, targetDataNode);
KillPoint.setKillPoint(state);
setNextState(REMOVE_REGION_PEER);
break;
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index e2d02b75338..e886ad51b4c 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -98,6 +98,8 @@ import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static
org.apache.iotdb.commons.utils.FileUtils.humanReadableByteCountSI;
+
public class IoTConsensusServerImpl {
private static final String CONFIGURATION_FILE_NAME = "configuration.dat";
@@ -295,13 +297,20 @@ public class IoTConsensusServerImpl {
File snapshotDir = new File(storageDir, newSnapshotDirName);
List<Path> snapshotPaths = stateMachine.getSnapshotFiles(snapshotDir);
AtomicLong snapshotSizeSumAtomic = new AtomicLong();
+ StringBuilder allFilesStr = new StringBuilder();
snapshotPaths.forEach(
- snapshotPath -> {
+ path -> {
try {
- snapshotSizeSumAtomic.addAndGet(Files.size(snapshotPath));
+ long fileSize = Files.size(path);
+ snapshotSizeSumAtomic.addAndGet(fileSize);
+ allFilesStr
+ .append("\n")
+ .append(path)
+ .append(" ")
+ .append(humanReadableByteCountSI(fileSize));
} catch (IOException e) {
logger.error(
- "[SNAPSHOT TRANSMISSION] Calculate snapshot file's size fail:
{}", snapshotPath, e);
+ "[SNAPSHOT TRANSMISSION] Calculate snapshot file's size fail:
{}", path, e);
}
});
final long snapshotSizeSum = snapshotSizeSumAtomic.get();
@@ -311,8 +320,10 @@ public class IoTConsensusServerImpl {
logger.info(
"[SNAPSHOT TRANSMISSION] Start to transmit snapshots ({} files, total
size {}) from dir {}",
snapshotPaths.size(),
- FileUtils.byteCountToDisplaySize(snapshotSizeSum),
+ humanReadableByteCountSI(snapshotSizeSum),
snapshotDir);
+ logger.info(
+ "[SNAPSHOT TRANSMISSION] All the files below shell be transmitted:
{}", allFilesStr);
try (SyncIoTConsensusServiceClient client =
syncClientManager.borrowClient(targetPeer.getEndpoint())) {
for (Path path : snapshotPaths) {
@@ -334,14 +345,15 @@ public class IoTConsensusServerImpl {
transitedSnapshotSizeSum += reader.getTotalReadSize();
transitedFilesNum++;
logger.info(
- "[SNAPSHOT TRANSMISSION] The overall progress for dir {}: files
{}/{} done, size {}/{} done, time {} passed",
+ "[SNAPSHOT TRANSMISSION] The overall progress for dir {}: files
{}/{} done, size {}/{} done, time {} passed. File {} done.",
newSnapshotDirName,
transitedFilesNum,
snapshotPaths.size(),
- FileUtils.byteCountToDisplaySize(transitedSnapshotSizeSum),
- FileUtils.byteCountToDisplaySize(snapshotSizeSum),
+ humanReadableByteCountSI(transitedSnapshotSizeSum),
+ humanReadableByteCountSI(snapshotSizeSum),
CommonDateTimeUtils.convertMillisecondToDurationStr(
- (System.nanoTime() - startTime) / 1_000_000));
+ (System.nanoTime() - startTime) / 1_000_000),
+ path);
} finally {
reader.close();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
index 11a47d7dc20..5fea6c0a008 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
import
org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
@@ -180,6 +181,11 @@ public class RegionMigrateService implements IService {
} else {
SchemaRegionConsensusImpl.getInstance().resetPeerList(regionId,
correctPeers);
}
+ } catch (ConsensusGroupNotExistException e) {
+ LOGGER.warn(
+ "Reset peer list fail, this DataNode not contains peer of consensus
group {}. Maybe caused by create local peer failure.",
+ regionId,
+ e);
} catch (ConsensusException e) {
LOGGER.error("reset peer list fail", e);
return new
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
index f2788381b4a..ee2f5098642 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
@@ -35,6 +35,8 @@ import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
+import java.text.CharacterIterator;
+import java.text.StringCharacterIterator;
import java.util.Arrays;
import java.util.Objects;
@@ -286,4 +288,20 @@ public class FileUtils {
Files.copy(sourceFile.toPath(), targetFile.toPath());
return targetFile;
}
+
+ /**
+ * Transfer bytes to human-readable string. Copy from <a
+ * href="https://stackoverflow.com/a/3758880">stackoverflow</a>.
+ */
+ public static String humanReadableByteCountSI(long bytes) {
+ if (-1000 < bytes && bytes < 1000) {
+ return bytes + " B";
+ }
+ CharacterIterator ci = new StringCharacterIterator("KMGTPE");
+ while (bytes <= -999_950 || bytes >= 999_950) {
+ bytes /= 1000;
+ ci.next();
+ }
+ return String.format("%.2f %cB", bytes / 1000.0, ci.current());
+ }
}