This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 61dea8bb005 [to dev/1.3][region migration] Remove retry of
RegionMigrateService::addPeer (#14409)
61dea8bb005 is described below
commit 61dea8bb005b96ba8623ffb7a9727c649788aa43
Author: Li Yu Heng <[email protected]>
AuthorDate: Fri Dec 13 12:42:51 2024 +0800
[to dev/1.3][region migration] Remove retry of
RegionMigrateService::addPeer (#14409)
* [region migration] Remove retry of RegionMigrateService::addPeer (#14362)
* done
* synchronized & snapshot name
* remove test
* for removePeer and resetPeerList
* name
---
.../apache/iotdb/consensus/iot/IoTConsensus.java | 157 +++++++++++----------
.../consensus/iot/IoTConsensusServerImpl.java | 20 +--
.../apache/iotdb/consensus/iot/StabilityTest.java | 36 -----
.../iotdb/db/service/RegionMigrateService.java | 49 +++----
4 files changed, 103 insertions(+), 159 deletions(-)
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index 64e27ec7102..98f25d0716f 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -296,53 +296,55 @@ public class IoTConsensus implements IConsensus {
IoTConsensusServerImpl impl =
Optional.ofNullable(stateMachineMap.get(groupId))
.orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
- if (impl.getConfiguration().contains(peer)) {
- throw new PeerAlreadyInConsensusGroupException(groupId, peer);
- }
- try {
- // step 1: inactive new Peer to prepare for following steps
- logger.info("[IoTConsensus] inactivate new peer: {}", peer);
- impl.inactivePeer(peer, false);
+ synchronized (impl) {
+ if (impl.getConfiguration().contains(peer)) {
+ throw new PeerAlreadyInConsensusGroupException(groupId, peer);
+ }
+ try {
+ // step 1: inactive new Peer to prepare for following steps
+ logger.info("[IoTConsensus] inactivate new peer: {}", peer);
+ impl.inactivePeer(peer, false);
- // step 2: notify all the other Peers to build the sync connection to
newPeer
- logger.info("[IoTConsensus] notify current peers to build sync log...");
- impl.notifyPeersToBuildSyncLogChannel(peer);
+ // step 2: notify all the other Peers to build the sync connection to
newPeer
+ logger.info("[IoTConsensus] notify current peers to build sync
log...");
+ impl.notifyPeersToBuildSyncLogChannel(peer);
- // step 3: take snapshot
- logger.info("[IoTConsensus] start to take snapshot...");
+ // step 3: take snapshot
+ logger.info("[IoTConsensus] start to take snapshot...");
- impl.takeSnapshot();
+ impl.takeSnapshot();
- // step 4: transit snapshot
- logger.info("[IoTConsensus] start to transmit snapshot...");
- impl.transmitSnapshot(peer);
+ // step 4: transit snapshot
+ logger.info("[IoTConsensus] start to transmit snapshot...");
+ impl.transmitSnapshot(peer);
- // step 5: let the new peer load snapshot
- logger.info("[IoTConsensus] trigger new peer to load snapshot...");
- impl.triggerSnapshotLoad(peer);
-
KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_TRANSITION);
+ // step 5: let the new peer load snapshot
+ logger.info("[IoTConsensus] trigger new peer to load snapshot...");
+ impl.triggerSnapshotLoad(peer);
+
KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_TRANSITION);
- // step 6: active new Peer
- logger.info("[IoTConsensus] activate new peer...");
- impl.activePeer(peer);
+ // step 6: active new Peer
+ logger.info("[IoTConsensus] activate new peer...");
+ impl.activePeer(peer);
+
+ // step 7: notify remote peer to clean up transferred snapshot
+ logger.info("[IoTConsensus] clean up remote snapshot...");
+ try {
+ impl.cleanupRemoteSnapshot(peer);
+ } catch (ConsensusGroupModifyPeerException e) {
+ logger.warn("[IoTConsensus] failed to cleanup remote snapshot", e);
+ }
+ KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_DONE);
- // step 7: notify remote peer to clean up transferred snapshot
- logger.info("[IoTConsensus] clean up remote snapshot...");
- try {
- impl.cleanupRemoteSnapshot(peer);
} catch (ConsensusGroupModifyPeerException e) {
- logger.warn("[IoTConsensus] failed to cleanup remote snapshot", e);
+ logger.info("[IoTConsensus] add remote peer failed, automatic cleanup
side effects...");
+ // try to clean up the sync log channel
+ impl.notifyPeersToRemoveSyncLogChannel(peer);
+ throw new ConsensusException(e);
+ } finally {
+ logger.info("[IoTConsensus] clean up local snapshot...");
+ impl.cleanupLocalSnapshot();
}
- KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_DONE);
-
- } catch (ConsensusGroupModifyPeerException e) {
- logger.info("[IoTConsensus] add remote peer failed, automatic cleanup
side effects...");
- // try to clean up the sync log channel
- impl.notifyPeersToRemoveSyncLogChannel(peer);
- throw new ConsensusException(e);
- } finally {
- logger.info("[IoTConsensus] clean up local snapshot...");
- impl.cleanupLocalSnapshot();
}
}
@@ -352,29 +354,32 @@ public class IoTConsensus implements IConsensus {
Optional.ofNullable(stateMachineMap.get(groupId))
.orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
- if (!impl.getConfiguration().contains(peer)) {
- throw new PeerNotInConsensusGroupException(groupId, peer.toString());
- }
+ synchronized (impl) {
+ if (!impl.getConfiguration().contains(peer)) {
+ throw new PeerNotInConsensusGroupException(groupId, peer.toString());
+ }
- KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.INIT);
+ KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.INIT);
- // let other peers remove the sync channel with target peer
- impl.notifyPeersToRemoveSyncLogChannel(peer);
- KillPoint.setKillPoint(
-
IoTConsensusRemovePeerCoordinatorKillPoints.AFTER_NOTIFY_PEERS_TO_REMOVE_SYNC_LOG_CHANNEL);
+ // let other peers remove the sync channel with target peer
+ impl.notifyPeersToRemoveSyncLogChannel(peer);
+ KillPoint.setKillPoint(
+ IoTConsensusRemovePeerCoordinatorKillPoints
+ .AFTER_NOTIFY_PEERS_TO_REMOVE_SYNC_LOG_CHANNEL);
- try {
- // let target peer reject new write
- impl.inactivePeer(peer, true);
-
KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.AFTER_INACTIVE_PEER);
- // wait its SyncLog to complete
- impl.waitTargetPeerUntilSyncLogCompleted(peer);
- // wait its region related resource to release
- impl.waitReleaseAllRegionRelatedResource(peer);
- } catch (ConsensusGroupModifyPeerException e) {
- throw new ConsensusException(e.getMessage());
+ try {
+ // let target peer reject new write
+ impl.inactivePeer(peer, true);
+
KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.AFTER_INACTIVE_PEER);
+ // wait its SyncLog to complete
+ impl.waitTargetPeerUntilSyncLogCompleted(peer);
+ // wait its region related resource to release
+ impl.waitReleaseAllRegionRelatedResource(peer);
+ } catch (ConsensusGroupModifyPeerException e) {
+ throw new ConsensusException(e.getMessage());
+ }
+
KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.FINISH);
}
- KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.FINISH);
}
@Override
@@ -485,6 +490,7 @@ public class IoTConsensus implements IConsensus {
IoTConsensusServerImpl impl =
Optional.ofNullable(stateMachineMap.get(groupId))
.orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
+
Peer localPeer = new Peer(groupId, thisNodeId, thisNode);
if (!correctPeers.contains(localPeer)) {
logger.info(
@@ -493,25 +499,28 @@ public class IoTConsensus implements IConsensus {
deleteLocalPeer(groupId);
return;
}
- ImmutableList<Peer> currentMembers =
ImmutableList.copyOf(impl.getConfiguration());
- String previousPeerListStr = currentMembers.toString();
- for (Peer peer : currentMembers) {
- if (!correctPeers.contains(peer)) {
- if (!impl.removeSyncLogChannel(peer)) {
- logger.error(
- "[RESET PEER LIST] Failed to remove peer {}'s sync log channel
from group {}",
- peer,
- groupId);
+
+ synchronized (impl) {
+ ImmutableList<Peer> currentMembers =
ImmutableList.copyOf(impl.getConfiguration());
+ String previousPeerListStr = currentMembers.toString();
+ for (Peer peer : currentMembers) {
+ if (!correctPeers.contains(peer)) {
+ if (!impl.removeSyncLogChannel(peer)) {
+ logger.error(
+ "[RESET PEER LIST] Failed to remove peer {}'s sync log channel
from group {}",
+ peer,
+ groupId);
+ }
}
}
- }
- logger.info(
- "[RESET PEER LIST] Local peer list has been reset: {} -> {}",
- previousPeerListStr,
- impl.getConfiguration());
- for (Peer peer : correctPeers) {
- if (!impl.getConfiguration().contains(peer)) {
- logger.warn("[RESET PEER LIST] \"Correct peer\" {} is not in local
peer list", peer);
+ logger.info(
+ "[RESET PEER LIST] Local peer list has been reset: {} -> {}",
+ previousPeerListStr,
+ impl.getConfiguration());
+ for (Peer peer : correctPeers) {
+ if (!impl.getConfiguration().contains(peer)) {
+ logger.warn("[RESET PEER LIST] \"Correct peer\" {} is not in local
peer list", peer);
+ }
}
}
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 57256b39b6a..8ea522aea27 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -91,6 +91,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -276,10 +277,9 @@ public class IoTConsensusServerImpl {
public void takeSnapshot() throws ConsensusGroupModifyPeerException {
try {
- long newSnapshotIndex = getLatestSnapshotIndex() + 1;
newSnapshotDirName =
String.format(
- "%s_%s_%d", SNAPSHOT_DIR_NAME, thisNode.getGroupId().getId(),
newSnapshotIndex);
+ "%s_%s_%s", SNAPSHOT_DIR_NAME, thisNode.getGroupId().getId(),
UUID.randomUUID());
File snapshotDir = new File(storageDir, newSnapshotDirName);
if (snapshotDir.exists()) {
FileUtils.deleteDirectory(snapshotDir);
@@ -400,22 +400,6 @@ public class IoTConsensusServerImpl {
return originalFilePath.substring(originalFilePath.indexOf(snapshotId));
}
- private long getLatestSnapshotIndex() {
- long snapShotIndex = 0;
- File directory = new File(storageDir);
- File[] versionFiles = directory.listFiles((dir, name) ->
name.startsWith(SNAPSHOT_DIR_NAME));
- if (versionFiles == null || versionFiles.length == 0) {
- return snapShotIndex;
- }
- for (File file : versionFiles) {
- snapShotIndex =
- Math.max(
- snapShotIndex,
-
Long.parseLong(SNAPSHOT_INDEX_PATTEN.matcher(file.getName()).replaceAll("")));
- }
- return snapShotIndex;
- }
-
private void clearOldSnapshot() {
File directory = new File(storageDir);
File[] versionFiles = directory.listFiles((dir, name) ->
name.startsWith(SNAPSHOT_DIR_NAME));
diff --git
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java
index 43e328e3833..d7675084680 100644
---
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java
+++
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.config.ConsensusConfig;
import org.apache.iotdb.consensus.exception.ConsensusException;
import
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
-import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException;
import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
import org.apache.iotdb.consensus.exception.IllegalPeerNumException;
@@ -95,7 +94,6 @@ public class StabilityTest {
peerTest();
transferLeader();
snapshotTest();
- snapshotUpgradeTest();
}
public void addConsensusGroup() {
@@ -212,38 +210,4 @@ public class StabilityTest {
Assert.assertNotEquals(versionFiles1[0].getName(),
versionFiles2[0].getName());
consensusImpl.deleteLocalPeer(dataRegionId);
}
-
- public void snapshotUpgradeTest() throws Exception {
- consensusImpl.createLocalPeer(
- dataRegionId,
- Collections.singletonList(new Peer(dataRegionId, 1, new
TEndPoint("0.0.0.0", basePort))));
- consensusImpl.triggerSnapshot(dataRegionId, false);
- long oldSnapshotIndex = System.currentTimeMillis();
- String oldSnapshotDirName =
- String.format(
- "%s_%s_%d",
- IoTConsensusServerImpl.SNAPSHOT_DIR_NAME, dataRegionId.getId(),
oldSnapshotIndex);
- File regionDir = new File(storageDir, "1_1");
- File oldSnapshotDir = new File(regionDir, oldSnapshotDirName);
- if (oldSnapshotDir.exists()) {
- FileUtils.deleteFully(oldSnapshotDir);
- }
- if (!oldSnapshotDir.mkdirs()) {
- throw new ConsensusGroupModifyPeerException(
- String.format("%s: cannot mkdir for snapshot", dataRegionId));
- }
- consensusImpl.triggerSnapshot(dataRegionId, false);
- Assert.assertFalse(oldSnapshotDir.exists());
-
- File dataDir = new File(IoTConsensus.buildPeerDir(storageDir,
dataRegionId));
-
- File[] snapshotFiles =
- dataDir.listFiles((dir, name) ->
name.startsWith(IoTConsensusServerImpl.SNAPSHOT_DIR_NAME));
- Assert.assertNotNull(snapshotFiles);
- Assert.assertEquals(1, snapshotFiles.length);
- Assert.assertEquals(
- oldSnapshotIndex + 1,
-
Long.parseLong(snapshotFiles[0].getName().replaceAll(".*[^\\d](?=(\\d+))",
"")));
- consensusImpl.deleteLocalPeer(dataRegionId);
- }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
index c6551c0d54d..056df802c13 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
@@ -256,42 +256,29 @@ public class RegionMigrateService implements IService {
TEndPoint destEndpoint = getConsensusEndPoint(destDataNode, regionId);
boolean addPeerSucceed = true;
Throwable throwable = null;
- for (int i = 0; i < MAX_RETRY_NUM; i++) {
- try {
- if (!addPeerSucceed) {
- Thread.sleep(SLEEP_MILLIS);
- }
- addRegionPeer(regionId, new Peer(regionId,
destDataNode.getDataNodeId(), destEndpoint));
- addPeerSucceed = true;
- } catch (PeerAlreadyInConsensusGroupException e) {
- addPeerSucceed = true;
- } catch (InterruptedException e) {
- throwable = e;
- Thread.currentThread().interrupt();
- } catch (ConsensusException e) {
- addPeerSucceed = false;
- throwable = e;
- taskLogger.error(
- "{}, executed addPeer {} for region {} error, retry times: {}",
- REGION_MIGRATE_PROCESS,
- destEndpoint,
- regionId,
- i,
- e);
- } catch (Exception e) {
- addPeerSucceed = false;
- throwable = e;
- taskLogger.warn("Unexpected exception", e);
- }
- if (addPeerSucceed || throwable instanceof InterruptedException) {
- break;
- }
+ try {
+ addRegionPeer(regionId, new Peer(regionId,
destDataNode.getDataNodeId(), destEndpoint));
+ } catch (PeerAlreadyInConsensusGroupException ignore) {
+
+ } catch (ConsensusException e) {
+ addPeerSucceed = false;
+ throwable = e;
+ taskLogger.error(
+ "{}, executed addPeer {} for region {} error",
+ REGION_MIGRATE_PROCESS,
+ destEndpoint,
+ regionId,
+ e);
+ } catch (Exception e) {
+ addPeerSucceed = false;
+ throwable = e;
+ taskLogger.warn("Unexpected exception", e);
}
if (!addPeerSucceed) {
String errorMsg =
String.format(
- "%s, AddPeer for region error after max retry times, peerId:
%s, regionId: %s",
+ "%s, AddPeer for region error, peerId: %s, regionId: %s",
REGION_MIGRATE_PROCESS, destEndpoint, regionId);
taskLogger.error(errorMsg, throwable);
status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());