This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7d92f475296 [region migration] Make peer list correct when
IoTConsensus & IoTConsensusV2 & Ratis starting (#14505)
7d92f475296 is described below
commit 7d92f47529645f3399727a5dbc02db2de7052021
Author: Li Yu Heng <[email protected]>
AuthorDate: Mon Dec 23 20:03:56 2024 +0800
[region migration] Make peer list correct when IoTConsensus &
IoTConsensusV2 & Ratis starting (#14505)
* done
* for IoTV2 ?
* test conf
* fix iotv2
* ˜ø
* still to handle ratis
* all done
* rename start to starting
* use string var
* spotless
* tan review
* tan review
* add UT for iotv1
* fix review
* fix RatisConsensus
* RatisConsensus only sendConfiguration if necessary
* done
* add peer list
---------
Co-authored-by: Peng Junzhi <[email protected]>
Co-authored-by: Peng Junzhi <[email protected]>
---
.../iotdb/confignode/manager/node/NodeManager.java | 8 +-
.../confignode/persistence/pipe/PipeTaskInfo.java | 12 +-
.../org/apache/iotdb/consensus/IConsensus.java | 21 ++--
.../apache/iotdb/consensus/iot/IoTConsensus.java | 93 ++++++++------
.../consensus/iot/IoTConsensusServerImpl.java | 11 +-
.../service/IoTConsensusRPCServiceProcessor.java | 9 +-
.../apache/iotdb/consensus/pipe/PipeConsensus.java | 139 ++++++++++++++-------
.../iotdb/consensus/ratis/RatisConsensus.java | 78 +++++++-----
.../iotdb/consensus/simple/SimpleConsensus.java | 12 +-
.../apache/iotdb/consensus/iot/ReplicateTest.java | 4 -
.../apache/iotdb/consensus/iot/StabilityTest.java | 82 +++++++++---
.../iotdb/consensus/ratis/RatisConsensusTest.java | 4 -
.../consensus/ConsensusPipeDataNodeDispatcher.java | 11 ++
.../java/org/apache/iotdb/db/service/DataNode.java | 91 +++++++-------
.../config/constant/PipeRPCMessageConstant.java | 31 +++++
.../src/main/thrift/confignode.thrift | 2 +-
16 files changed, 375 insertions(+), 233 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 3bba8ff8da5..ecdb8e4ba45 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.confignode.manager.node;
import org.apache.iotdb.common.rpc.thrift.TAINodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
@@ -362,11 +361,8 @@ public class NodeManager {
resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_RESTART);
resp.setRuntimeConfiguration(getRuntimeConfiguration());
- List<TConsensusGroupId> consensusGroupIds =
- getPartitionManager().getAllReplicaSets(nodeId).stream()
- .map(TRegionReplicaSet::getRegionId)
- .collect(Collectors.toList());
- resp.setConsensusGroupIds(consensusGroupIds);
+
+
resp.setCorrectConsensusGroups(getPartitionManager().getAllReplicaSets(nodeId));
return resp;
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 70d23a47cc7..372209bfd74 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -78,6 +78,8 @@ import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static
org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeRPCMessageConstant.PIPE_ALREADY_EXIST_MSG;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeRPCMessageConstant.PIPE_NOT_EXIST_MSG;
public class PipeTaskInfo implements SnapshotProcessor {
@@ -179,8 +181,8 @@ public class PipeTaskInfo implements SnapshotProcessor {
final String exceptionMessage =
String.format(
- "Failed to create pipe %s, the pipe with the same name has been
created",
- createPipeRequest.getPipeName());
+ "Failed to create pipe %s, %s",
+ createPipeRequest.getPipeName(), PIPE_ALREADY_EXIST_MSG);
LOGGER.warn(exceptionMessage);
throw new PipeException(exceptionMessage);
}
@@ -204,7 +206,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
final String exceptionMessage =
String.format(
- "Failed to alter pipe %s, the pipe does not exist",
alterPipeRequest.getPipeName());
+ "Failed to alter pipe %s, %s", alterPipeRequest.getPipeName(),
PIPE_NOT_EXIST_MSG);
LOGGER.warn(exceptionMessage);
throw new PipeException(exceptionMessage);
}
@@ -281,7 +283,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
private void checkBeforeStartPipeInternal(final String pipeName) throws
PipeException {
if (!isPipeExisted(pipeName)) {
final String exceptionMessage =
- String.format("Failed to start pipe %s, the pipe does not exist",
pipeName);
+ String.format("Failed to start pipe %s, %s", pipeName,
PIPE_NOT_EXIST_MSG);
LOGGER.warn(exceptionMessage);
throw new PipeException(exceptionMessage);
}
@@ -307,7 +309,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
private void checkBeforeStopPipeInternal(final String pipeName) throws
PipeException {
if (!isPipeExisted(pipeName)) {
final String exceptionMessage =
- String.format("Failed to stop pipe %s, the pipe does not exist",
pipeName);
+ String.format("Failed to stop pipe %s, %s", pipeName,
PIPE_NOT_EXIST_MSG);
LOGGER.warn(exceptionMessage);
throw new PipeException(exceptionMessage);
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
index 0e3af42d802..8f49af524bc 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
@@ -37,6 +37,7 @@ import javax.annotation.concurrent.ThreadSafe;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
/** Consensus module base interface. */
@ThreadSafe
@@ -145,6 +146,15 @@ public interface IConsensus {
*/
void removeRemotePeer(ConsensusGroupId groupId, Peer peer) throws
ConsensusException;
+ /**
+ * Record the correct peer list (likely got from the ConfigNode) for future
use in resetPeerList.
+ * Only use this method if necessary. If it is called, it should be called
before {@link
+ * #start()}.
+ *
+ * @param correctPeerList The correct consensus group member list
+ */
+ void recordCorrectPeerListBeforeStarting(Map<ConsensusGroupId, List<Peer>>
correctPeerList);
+
/**
* Reset the peer list of the corresponding consensus group. Currently only
used in the automatic
* cleanup of region migration as a rollback for {@link
#addRemotePeer(ConsensusGroupId, Peer)},
@@ -226,17 +236,6 @@ public interface IConsensus {
*/
List<ConsensusGroupId> getAllConsensusGroupIds();
- /**
- * Return all consensus group ids from disk.
- *
- * <p>We need to parse all the RegionGroupIds from the disk directory before
starting the
- * consensus layer, and {@link #getAllConsensusGroupIds()} returns an empty
list, so we need to
- * add a new interface.
- *
- * @return consensusGroupId list
- */
- List<ConsensusGroupId> getAllConsensusGroupIdsWithoutStarting();
-
/**
* Return the region directory of the corresponding consensus group.
*
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index 7f1222e5c67..987ba658538 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -80,6 +80,8 @@ import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
public class IoTConsensus implements IConsensus {
@@ -99,6 +101,7 @@ public class IoTConsensus implements IConsensus {
private final IClientManager<TEndPoint, SyncIoTConsensusServiceClient>
syncClientManager;
private final ScheduledExecutorService backgroundTaskService;
private Future<?> updateReaderFuture;
+ private Map<ConsensusGroupId, List<Peer>> correctPeerListBeforeStart = null;
public IoTConsensus(ConsensusConfig config, Registry registry) {
this.thisNode = config.getThisNodeEndPoint();
@@ -178,10 +181,32 @@ public class IoTConsensus implements IConsensus {
syncClientManager,
config);
stateMachineMap.put(consensusGroupId, consensus);
- consensus.start();
}
}
}
+ if (correctPeerListBeforeStart != null) {
+ BiConsumer<ConsensusGroupId, List<Peer>> resetPeerListWithoutThrow =
+ (consensusGroupId, peers) -> {
+ try {
+ resetPeerList(consensusGroupId, peers);
+ } catch (ConsensusGroupNotExistException ignore) {
+
+ } catch (Exception e) {
+ logger.warn("Failed to reset peer list while start", e);
+ }
+ };
+ // make peers which are in list correct
+ correctPeerListBeforeStart.forEach(resetPeerListWithoutThrow);
+ // clear peers which are not in the list
+ stateMachineMap.keySet().stream()
+ .filter(consensusGroupId ->
!correctPeerListBeforeStart.containsKey(consensusGroupId))
+ // copy to a new list to avoid concurrent modification
+ .collect(Collectors.toList())
+ .forEach(
+ consensusGroupId ->
+ resetPeerListWithoutThrow.accept(consensusGroupId,
Collections.emptyList()));
+ }
+ stateMachineMap.values().forEach(IoTConsensusServerImpl::start);
}
@Override
@@ -435,36 +460,6 @@ public class IoTConsensus implements IConsensus {
return new ArrayList<>(stateMachineMap.keySet());
}
- @Override
- public List<ConsensusGroupId> getAllConsensusGroupIdsWithoutStarting() {
- return getConsensusGroupIdsFromDir(storageDir, logger);
- }
-
- public static List<ConsensusGroupId> getConsensusGroupIdsFromDir(File
storageDir, Logger logger) {
- if (!storageDir.exists()) {
- return Collections.emptyList();
- }
- List<ConsensusGroupId> consensusGroupIds = new ArrayList<>();
- try (DirectoryStream<Path> stream =
Files.newDirectoryStream(storageDir.toPath())) {
- for (Path path : stream) {
- try {
- String[] items = path.getFileName().toString().split("_");
- ConsensusGroupId consensusGroupId =
- ConsensusGroupId.Factory.create(
- Integer.parseInt(items[0]), Integer.parseInt(items[1]));
- consensusGroupIds.add(consensusGroupId);
- } catch (Exception e) {
- logger.info(
- "The directory {} is not a group directory;" + " ignoring it. ",
- path.getFileName().toString());
- }
- }
- } catch (IOException e) {
- logger.error("Failed to get all consensus group ids from disk", e);
- }
- return consensusGroupIds;
- }
-
@Override
public String getRegionDirFromConsensusGroupId(ConsensusGroupId groupId) {
return buildPeerDir(storageDir, groupId);
@@ -483,10 +478,16 @@ public class IoTConsensus implements IConsensus {
.init(config.getReplication().getRegionMigrationSpeedLimitBytesPerSecond());
}
+ @Override
+ public void recordCorrectPeerListBeforeStarting(
+ Map<ConsensusGroupId, List<Peer>> correctPeerList) {
+ logger.info("Record correct peer list: {}", correctPeerList);
+ this.correctPeerListBeforeStart = correctPeerList;
+ }
+
@Override
public void resetPeerList(ConsensusGroupId groupId, List<Peer> correctPeers)
throws ConsensusException {
- logger.info("[RESET PEER LIST] Start to reset peer list to {}",
correctPeers);
IoTConsensusServerImpl impl =
Optional.ofNullable(stateMachineMap.get(groupId))
.orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
@@ -501,27 +502,37 @@ public class IoTConsensus implements IConsensus {
}
synchronized (impl) {
+ // remove invalid peer
ImmutableList<Peer> currentMembers =
ImmutableList.copyOf(impl.getConfiguration());
String previousPeerListStr = currentMembers.toString();
for (Peer peer : currentMembers) {
if (!correctPeers.contains(peer)) {
if (!impl.removeSyncLogChannel(peer)) {
- logger.error(
- "[RESET PEER LIST] Failed to remove peer {}'s sync log channel
from group {}",
- peer,
- groupId);
+ logger.error("[RESET PEER LIST] Failed to remove sync channel
with: {}", peer);
+ } else {
+ logger.info("[RESET PEER LIST] Remove sync channel with: {}",
peer);
}
}
}
- logger.info(
- "[RESET PEER LIST] Local peer list has been reset: {} -> {}",
- previousPeerListStr,
- impl.getConfiguration());
+ // add correct peer
for (Peer peer : correctPeers) {
if (!impl.getConfiguration().contains(peer)) {
- logger.warn("[RESET PEER LIST] \"Correct peer\" {} is not in local
peer list", peer);
+ impl.buildSyncLogChannel(peer);
+ logger.info("[RESET PEER LIST] Build sync channel with: {}", peer);
}
}
+ // show result
+ String newPeerListStr = impl.getConfiguration().toString();
+ if (!previousPeerListStr.equals(newPeerListStr)) {
+ logger.info(
+ "[RESET PEER LIST] Local peer list has been reset: {} -> {}",
+ previousPeerListStr,
+ newPeerListStr);
+ } else {
+ logger.info(
+ "[RESET PEER LIST] The current peer list is correct, nothing need
to be reset: {}",
+ previousPeerListStr);
+ }
}
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 8ea522aea27..1a32248853a 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -645,17 +645,12 @@ public class IoTConsensusServerImpl {
return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
}
- /**
- * build SyncLog channel with safeIndex as the default initial sync index.
- *
- * @throws ConsensusGroupModifyPeerException
- */
- public void buildSyncLogChannel(Peer targetPeer) throws
ConsensusGroupModifyPeerException {
+ /** build SyncLog channel with safeIndex as the default initial sync index.
*/
+ public void buildSyncLogChannel(Peer targetPeer) {
buildSyncLogChannel(targetPeer, getMinSyncIndex());
}
- public void buildSyncLogChannel(Peer targetPeer, long initialSyncIndex)
- throws ConsensusGroupModifyPeerException {
+ public void buildSyncLogChannel(Peer targetPeer, long initialSyncIndex) {
KillPoint.setKillPoint(DataNodeKillPoints.ORIGINAL_ADD_PEER_DONE);
// step 1, build sync channel in LogDispatcher
logger.info(
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
index a76b0e97b06..dc88d8eb98b 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
@@ -185,13 +185,8 @@ public class IoTConsensusRPCServiceProcessor implements
IoTConsensusIService.Ifa
return new TBuildSyncLogChannelRes(status);
}
TSStatus responseStatus;
- try {
- impl.buildSyncLogChannel(new Peer(groupId, req.nodeId, req.endPoint));
- responseStatus = new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- } catch (ConsensusGroupModifyPeerException e) {
- responseStatus = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
- responseStatus.setMessage(e.getMessage());
- }
+ impl.buildSyncLogChannel(new Peer(groupId, req.nodeId, req.endPoint));
+ responseStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
return new TBuildSyncLogChannelRes(responseStatus);
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
index ce5acf73cc3..5328a25f9dc 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
@@ -69,6 +69,7 @@ import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -77,10 +78,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiConsumer;
import java.util.stream.Collectors;
-import static
org.apache.iotdb.consensus.iot.IoTConsensus.getConsensusGroupIdsFromDir;
-
public class PipeConsensus implements IConsensus {
private static final String CONSENSUS_PIPE_GUARDIAN_TASK_ID =
"consensus_pipe_guardian";
private static final String CLASS_NAME = PipeConsensus.class.getSimpleName();
@@ -102,6 +102,7 @@ public class PipeConsensus implements IConsensus {
private final ConsensusPipeGuardian consensusPipeGuardian;
private final IClientManager<TEndPoint, AsyncPipeConsensusServiceClient>
asyncClientManager;
private final IClientManager<TEndPoint, SyncPipeConsensusServiceClient>
syncClientManager;
+ private Map<ConsensusGroupId, List<Peer>> correctPeerListBeforeStart = null;
public PipeConsensus(ConsensusConfig config, IStateMachine.Registry
registry) {
this.thisNode = config.getThisNodeEndPoint();
@@ -146,33 +147,64 @@ public class PipeConsensus implements IConsensus {
}
} else {
// asynchronously recover, retry logic is implemented at
PipeConsensusImpl
- CompletableFuture.runAsync(
- () -> {
- try (DirectoryStream<Path> stream =
Files.newDirectoryStream(storageDir.toPath())) {
- for (Path path : stream) {
- ConsensusGroupId consensusGroupId =
- parsePeerFileName(path.getFileName().toString());
- PipeConsensusServerImpl consensus =
- new PipeConsensusServerImpl(
- new Peer(consensusGroupId, thisNodeId, thisNode),
- registry.apply(consensusGroupId),
- path.toString(),
- new ArrayList<>(),
- config,
- consensusPipeManager,
- syncClientManager);
- stateMachineMap.put(consensusGroupId, consensus);
- consensus.start(true);
- }
- } catch (Exception e) {
- LOGGER.error("Failed to recover consensus from {}",
storageDir, e);
- }
- })
- .exceptionally(
- e -> {
- LOGGER.error("Failed to recover consensus from {}",
storageDir, e);
- return null;
- });
+ CompletableFuture<Void> future =
+ CompletableFuture.runAsync(
+ () -> {
+ try (DirectoryStream<Path> stream =
+ Files.newDirectoryStream(storageDir.toPath())) {
+ for (Path path : stream) {
+ ConsensusGroupId consensusGroupId =
+ parsePeerFileName(path.getFileName().toString());
+ PipeConsensusServerImpl consensus =
+ new PipeConsensusServerImpl(
+ new Peer(consensusGroupId, thisNodeId,
thisNode),
+ registry.apply(consensusGroupId),
+ path.toString(),
+ new ArrayList<>(),
+ config,
+ consensusPipeManager,
+ syncClientManager);
+ stateMachineMap.put(consensusGroupId, consensus);
+ checkPeerListAndStartIfEligible(consensusGroupId,
consensus);
+ }
+ } catch (Exception e) {
+ LOGGER.error("Failed to recover consensus from {}",
storageDir, e);
+ }
+ })
+ .exceptionally(
+ e -> {
+ LOGGER.error("Failed to recover consensus from {}",
storageDir, e);
+ return null;
+ });
+ }
+ }
+
+ private void checkPeerListAndStartIfEligible(
+ ConsensusGroupId consensusGroupId, PipeConsensusServerImpl consensus)
throws IOException {
+ BiConsumer<ConsensusGroupId, List<Peer>> resetPeerListWithoutThrow =
+ (dataRegionId, peers) -> {
+ try {
+ resetPeerList(dataRegionId, peers);
+ } catch (ConsensusGroupNotExistException ignore) {
+
+ } catch (Exception e) {
+ LOGGER.warn("Failed to reset peer list while start", e);
+ }
+ };
+
+ if (correctPeerListBeforeStart != null) {
+ if (correctPeerListBeforeStart.containsKey(consensusGroupId)) {
+ // make peers which are in list correct
+ resetPeerListWithoutThrow.accept(
+ consensusGroupId,
correctPeerListBeforeStart.get(consensusGroupId));
+ consensus.start(true);
+ } else {
+ // clear peers which are not in the list
+ resetPeerListWithoutThrow.accept(consensusGroupId,
Collections.emptyList());
+ }
+
+ } else {
+ consensus.start(true);
}
}
@@ -412,13 +444,20 @@ public class PipeConsensus implements IConsensus {
KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.FINISH);
}
+ @Override
+ public void recordCorrectPeerListBeforeStarting(
+ Map<ConsensusGroupId, List<Peer>> correctPeerList) {
+ LOGGER.info("Record correct peer list: {}", correctPeerList);
+ this.correctPeerListBeforeStart = correctPeerList;
+ }
+
@Override
public void resetPeerList(ConsensusGroupId groupId, List<Peer> correctPeers)
throws ConsensusException {
- LOGGER.info("[RESET PEER LIST] Start to reset peer list to {}",
correctPeers);
PipeConsensusServerImpl impl =
Optional.ofNullable(stateMachineMap.get(groupId))
.orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
+
if (!correctPeers.contains(new Peer(groupId, thisNodeId, thisNode))) {
LOGGER.warn(
"[RESET PEER LIST] Local peer is not in the correct configuration,
delete local peer {}",
@@ -426,30 +465,43 @@ public class PipeConsensus implements IConsensus {
deleteLocalPeer(groupId);
return;
}
+
ImmutableList<Peer> currentPeers = ImmutableList.copyOf(impl.getPeers());
String previousPeerListStr = impl.getPeers().toString();
+ // remove invalid peer
for (Peer peer : currentPeers) {
if (!correctPeers.contains(peer)) {
try {
impl.dropConsensusPipeToTargetPeer(peer);
+ LOGGER.info("[RESET PEER LIST] Remove sync channel with: {}", peer);
} catch (ConsensusGroupModifyPeerException e) {
- LOGGER.error(
- "[RESET PEER LIST] Failed to remove peer {}'s consensus pipe
from group {}",
- peer,
- groupId,
- e);
+ LOGGER.error("[RESET PEER LIST] Failed to remove sync channel with:
{}", peer, e);
}
}
}
- LOGGER.info(
- "[RESET PEER LIST] Local peer list has been reset: {} -> {}",
- previousPeerListStr,
- impl.getPeers());
+ // add correct peer
for (Peer peer : correctPeers) {
- if (!impl.containsPeer(peer)) {
- LOGGER.warn("[RESET PEER LIST] \"Correct peer\" {} is not in local
peer list", peer);
+ if (!impl.containsPeer(peer) && peer.getNodeId() != this.thisNodeId) {
+ try {
+ impl.createConsensusPipeToTargetPeer(peer, false);
+ LOGGER.info("[RESET PEER LIST] Build sync channel with: {}", peer);
+ } catch (ConsensusGroupModifyPeerException e) {
+ LOGGER.warn("[RESET PEER LIST] Failed to build sync channel with:
{}", peer, e);
+ }
}
}
+ // show result
+ String currentPeerListStr = impl.getPeers().toString();
+ if (!previousPeerListStr.equals(currentPeerListStr)) {
+ LOGGER.info(
+ "[RESET PEER LIST] Local peer list has been reset: {} -> {}",
+ previousPeerListStr,
+ impl.getPeers());
+ } else {
+ LOGGER.info(
+ "[RESET PEER LIST] The current peer list is correct, nothing need to
be reset: {}",
+ previousPeerListStr);
+ }
}
@Override
@@ -500,11 +552,6 @@ public class PipeConsensus implements IConsensus {
return new ArrayList<>(stateMachineMap.keySet());
}
- @Override
- public List<ConsensusGroupId> getAllConsensusGroupIdsWithoutStarting() {
- return getConsensusGroupIdsFromDir(storageDir, LOGGER);
- }
-
@Override
public String getRegionDirFromConsensusGroupId(ConsensusGroupId groupId) {
return getPeerDir(groupId);
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index 9b4c3274e6a..9ae8dcd6a26 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -94,19 +94,18 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
-import java.nio.file.DirectoryStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.UUID;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
import java.util.function.BooleanSupplier;
import java.util.stream.Collectors;
@@ -146,6 +145,8 @@ class RatisConsensus implements IConsensus {
private final RatisMetricSet ratisMetricSet;
private final TConsensusGroupType consensusGroupType;
+ private Map<ConsensusGroupId, List<Peer>> correctPeerListBeforeStart = null;
+
private final ConcurrentHashMap<ConsensusGroupId, AtomicBoolean>
canServeStaleRead;
public RatisConsensus(ConsensusConfig config, IStateMachine.Registry
registry) {
@@ -235,6 +236,27 @@ class RatisConsensus implements IConsensus {
MetricService.getInstance().addMetricSet(this.ratisMetricSet);
server.get().start();
registerAndStartDiskGuardian();
+
+ if (correctPeerListBeforeStart != null) {
+ BiConsumer<ConsensusGroupId, List<Peer>> resetPeerListWithoutThrow =
+ (consensusGroupId, peers) -> {
+ try {
+ resetPeerList(consensusGroupId, peers);
+ } catch (ConsensusGroupNotExistException ignore) {
+
+ } catch (Exception e) {
+ logger.warn("Failed to reset peer list while start", e);
+ }
+ };
+ // make peers which are in list correct
+ correctPeerListBeforeStart.forEach(resetPeerListWithoutThrow);
+ // clear peers which are not in the list
+ getAllConsensusGroupIds().stream()
+ .filter(consensusGroupId ->
!correctPeerListBeforeStart.containsKey(consensusGroupId))
+ .forEach(
+ consensusGroupId ->
+ resetPeerListWithoutThrow.accept(consensusGroupId,
Collections.emptyList()));
+ }
}
@Override
@@ -591,10 +613,16 @@ class RatisConsensus implements IConsensus {
sendReconfiguration(RaftGroup.valueOf(raftGroupId, newConfig));
}
+ @Override
+ public void recordCorrectPeerListBeforeStarting(
+ Map<ConsensusGroupId, List<Peer>> correctPeerList) {
+ logger.info("Record correct peer list: {}", correctPeerList);
+ this.correctPeerListBeforeStart = correctPeerList;
+ }
+
@Override
public void resetPeerList(ConsensusGroupId groupId, List<Peer> correctPeers)
throws ConsensusException {
- logger.info("[RESET PEER LIST] Start to reset peer list to {}",
correctPeers);
final RaftGroupId raftGroupId =
Utils.fromConsensusGroupIdToRaftGroupId(groupId);
final RaftGroup group = getGroupInfo(raftGroupId);
@@ -610,7 +638,7 @@ class RatisConsensus implements IConsensus {
peer.getNodeId(), peer.getEndpoint(),
DEFAULT_PRIORITY))
.anyMatch(
raftPeer ->
- myself.getId() == raftPeer.getId()
+ myself.getId().equals(raftPeer.getId())
&& myself.getAddress().equals(raftPeer.getAddress()));
if (!myselfInCorrectPeers) {
logger.info(
@@ -624,6 +652,20 @@ class RatisConsensus implements IConsensus {
Utils.fromPeersAndPriorityToRaftPeers(correctPeers, DEFAULT_PRIORITY);
final RaftGroup newGroup = RaftGroup.valueOf(raftGroupId, newGroupPeers);
+ Set<RaftPeer> localRaftPeerSet = new HashSet<>(group.getPeers());
+ Set<RaftPeer> correctRaftPeerSet = new HashSet<>(newGroupPeers);
+ if (localRaftPeerSet.equals(correctRaftPeerSet)) {
+ // configurations are the same
+ logger.info(
+ "[RESET PEER LIST] The current peer list is correct, nothing need to
be reset: {}",
+ localRaftPeerSet);
+ return;
+ }
+
+ logger.info(
+ "[RESET PEER LIST] Peer list will be reset from {} to {}",
+ localRaftPeerSet,
+ correctRaftPeerSet);
RaftClientReply reply = sendReconfiguration(newGroup);
if (reply.isSuccess()) {
logger.info("[RESET PEER LIST] Peer list has been reset to {}",
newGroupPeers);
@@ -791,30 +833,6 @@ class RatisConsensus implements IConsensus {
}
}
- @Override
- public List<ConsensusGroupId> getAllConsensusGroupIdsWithoutStarting() {
- if (!storageDir.exists()) {
- return Collections.emptyList();
- }
- List<ConsensusGroupId> consensusGroupIds = new ArrayList<>();
- try (DirectoryStream<Path> stream =
Files.newDirectoryStream(storageDir.toPath())) {
- for (Path path : stream) {
- try {
- RaftGroupId raftGroupId =
-
RaftGroupId.valueOf(UUID.fromString(path.getFileName().toString()));
-
consensusGroupIds.add(Utils.fromRaftGroupIdToConsensusGroupId(raftGroupId));
- } catch (Exception e) {
- logger.info(
- "The directory {} is not a group directory;" + " ignoring it. ",
- path.getFileName().toString());
- }
- }
- } catch (IOException e) {
- logger.error("Failed to get all consensus group ids from disk", e);
- }
- return consensusGroupIds;
- }
-
@Override
public String getRegionDirFromConsensusGroupId(ConsensusGroupId
consensusGroupId) {
RaftGroupId raftGroupId =
Utils.fromConsensusGroupIdToRaftGroupId(consensusGroupId);
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java
index 19258309028..5800e76b008 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java
@@ -38,7 +38,6 @@ import
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
import org.apache.iotdb.consensus.exception.IllegalPeerNumException;
-import org.apache.iotdb.consensus.iot.IoTConsensus;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
@@ -211,6 +210,12 @@ class SimpleConsensus implements IConsensus {
throw new ConsensusException("SimpleConsensus does not support membership
changes");
}
+ @Override
+ public void recordCorrectPeerListBeforeStarting(
+ Map<ConsensusGroupId, List<Peer>> correctPeerList) {
+ logger.info("SimpleConsensus will do nothing when calling
recordCorrectPeerListBeforeStarting");
+ }
+
@Override
public void transferLeader(ConsensusGroupId groupId, Peer newLeader) throws
ConsensusException {
throw new ConsensusException("SimpleConsensus does not support leader
transfer");
@@ -254,11 +259,6 @@ class SimpleConsensus implements IConsensus {
return new ArrayList<>(stateMachineMap.keySet());
}
- @Override
- public List<ConsensusGroupId> getAllConsensusGroupIdsWithoutStarting() {
- return IoTConsensus.getConsensusGroupIdsFromDir(storageDir, logger);
- }
-
@Override
public String getRegionDirFromConsensusGroupId(ConsensusGroupId groupId) {
return buildPeerDir(groupId);
diff --git
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
index 8072ab10066..3ec7769f2ac 100644
---
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
+++
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
@@ -324,10 +324,6 @@ public class ReplicateTest {
for (int i = 0; i < CHECK_POINT_GAP; i++) {
servers.get(0).write(gid, new TestEntry(i, peers.get(0)));
}
- List<ConsensusGroupId> ids =
servers.get(0).getAllConsensusGroupIdsWithoutStarting();
-
- Assert.assertEquals(1, ids.size());
- Assert.assertEquals(gid, ids.get(0));
String regionDir = servers.get(0).getRegionDirFromConsensusGroupId(gid);
try {
diff --git
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java
index d7675084680..9fcbf9e03f1 100644
---
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java
+++
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.consensus.ConsensusFactory;
-import org.apache.iotdb.consensus.IConsensus;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.config.ConsensusConfig;
import org.apache.iotdb.consensus.exception.ConsensusException;
@@ -42,7 +41,11 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import static org.junit.Assert.assertTrue;
@@ -52,32 +55,34 @@ public class StabilityTest {
private final File storageDir = new File("target" + java.io.File.separator +
"stability");
- private IConsensus consensusImpl;
+ private IoTConsensus consensusImpl;
private final int basePort = 6667;
public void constructConsensus() throws IOException {
consensusImpl =
- ConsensusFactory.getConsensusImpl(
- ConsensusFactory.IOT_CONSENSUS,
- ConsensusConfig.newBuilder()
- .setThisNodeId(1)
- .setThisNode(new TEndPoint("0.0.0.0", basePort))
- .setStorageDir(storageDir.getAbsolutePath())
- .setConsensusGroupType(TConsensusGroupType.DataRegion)
- .build(),
- gid -> new TestStateMachine())
- .orElseThrow(
- () ->
- new IllegalArgumentException(
- String.format(
- ConsensusFactory.CONSTRUCT_FAILED_MSG,
- ConsensusFactory.IOT_CONSENSUS)));
+ (IoTConsensus)
+ ConsensusFactory.getConsensusImpl(
+ ConsensusFactory.IOT_CONSENSUS,
+ ConsensusConfig.newBuilder()
+ .setThisNodeId(1)
+ .setThisNode(new TEndPoint("0.0.0.0", basePort))
+ .setStorageDir(storageDir.getAbsolutePath())
+ .setConsensusGroupType(TConsensusGroupType.DataRegion)
+ .build(),
+ gid -> new TestStateMachine())
+ .orElseThrow(
+ () ->
+ new IllegalArgumentException(
+ String.format(
+ ConsensusFactory.CONSTRUCT_FAILED_MSG,
+ ConsensusFactory.IOT_CONSENSUS)));
consensusImpl.start();
}
@Before
public void setUp() throws Exception {
+ FileUtils.deleteFully(storageDir);
constructConsensus();
}
@@ -210,4 +215,47 @@ public class StabilityTest {
Assert.assertNotEquals(versionFiles1[0].getName(),
versionFiles2[0].getName());
consensusImpl.deleteLocalPeer(dataRegionId);
}
+
+ @Test
+ public void recordAndResetPeerListTest() throws Exception {
+ try {
+ Assert.assertEquals(0, consensusImpl.getReplicationNum(dataRegionId));
+ consensusImpl.createLocalPeer(
+ dataRegionId,
+ Collections.singletonList(new Peer(dataRegionId, 1, new
TEndPoint("0.0.0.0", basePort))));
+ Assert.assertEquals(1, consensusImpl.getReplicationNum(dataRegionId));
+ Assert.assertEquals(1,
consensusImpl.getImpl(dataRegionId).getConfiguration().size());
+ } catch (ConsensusException e) {
+ Assert.fail();
+ }
+ consensusImpl.stop();
+
+ // test add sync channel
+ Map<ConsensusGroupId, List<Peer>> correctPeers = new HashMap<>();
+ List<Peer> peerList1And2 = new ArrayList<>();
+ peerList1And2.add(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0",
basePort)));
+ peerList1And2.add(new Peer(dataRegionId, 2, new TEndPoint("0.0.0.0",
basePort)));
+ correctPeers.put(dataRegionId, peerList1And2);
+ consensusImpl.recordCorrectPeerListBeforeStarting(correctPeers);
+ consensusImpl.start();
+ Assert.assertEquals(2,
consensusImpl.getImpl(dataRegionId).getConfiguration().size());
+ consensusImpl.stop();
+
+ // test remove sync channel
+ List<Peer> peerList1 = new ArrayList<>();
+ peerList1.add(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0",
basePort)));
+ correctPeers.put(dataRegionId, peerList1);
+ consensusImpl.recordCorrectPeerListBeforeStarting(correctPeers);
+ consensusImpl.start();
+ Assert.assertEquals(1,
consensusImpl.getImpl(dataRegionId).getConfiguration().size());
+ consensusImpl.stop();
+
+ // test remove invalid peer
+ List<Peer> peerList2 = new ArrayList<>();
+ peerList2.add(new Peer(dataRegionId, 2, new TEndPoint("0.0.0.0",
basePort)));
+ correctPeers.put(dataRegionId, peerList2);
+ consensusImpl.recordCorrectPeerListBeforeStarting(correctPeers);
+ consensusImpl.start();
+ Assert.assertNull(consensusImpl.getImpl(dataRegionId));
+ }
}
diff --git
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
index 685f580e4ba..842ea78af72 100644
---
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
+++
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
@@ -302,10 +302,6 @@ public class RatisConsensusTest {
servers.get(0).createLocalPeer(gid, peers.subList(0, 1));
doConsensus(0, 10, 10);
- List<ConsensusGroupId> ids =
servers.get(0).getAllConsensusGroupIdsWithoutStarting();
- Assert.assertEquals(1, ids.size());
- Assert.assertEquals(gid, ids.get(0));
-
String regionDir = servers.get(0).getRegionDirFromConsensusGroupId(gid);
try {
File regionDirFile = new File(regionDir);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ConsensusPipeDataNodeDispatcher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ConsensusPipeDataNodeDispatcher.java
index 23db3caa0d7..3e56a57e3ca 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ConsensusPipeDataNodeDispatcher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ConsensusPipeDataNodeDispatcher.java
@@ -37,6 +37,9 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeRPCMessageConstant.PIPE_ALREADY_EXIST_MSG;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeRPCMessageConstant.PIPE_NOT_EXIST_MSG;
+
public class ConsensusPipeDataNodeDispatcher implements
ConsensusPipeDispatcher {
private static final Logger LOGGER =
LoggerFactory.getLogger(ConsensusPipeDataNodeDispatcher.class);
@@ -64,6 +67,10 @@ public class ConsensusPipeDataNodeDispatcher implements
ConsensusPipeDispatcher
TSStatus status = configNodeClient.createPipe(req);
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != status.getCode()) {
LOGGER.warn("Failed to create consensus pipe-{}, status: {}",
pipeName, status);
+ // ignore idempotence logic
+ if (status.getMessage().contains(PIPE_ALREADY_EXIST_MSG)) {
+ return;
+ }
throw new PipeException(status.getMessage());
}
} catch (Exception e) {
@@ -111,6 +118,10 @@ public class ConsensusPipeDataNodeDispatcher implements
ConsensusPipeDispatcher
final TSStatus status = configNodeClient.dropPipe(pipeName.toString());
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != status.getCode()) {
LOGGER.warn("Failed to drop consensus pipe-{}, status: {}", pipeName,
status);
+ // ignore idempotence logic
+ if (status.getMessage().contains(PIPE_NOT_EXIST_MSG)) {
+ return;
+ }
throw new PipeException(status.getMessage());
}
} catch (Exception e) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
index fc2100d45d1..9a2ff29502a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -21,12 +21,12 @@ package org.apache.iotdb.db.service;
import org.apache.iotdb.common.rpc.thrift.Model;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TNodeResource;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.ServerCommandLine;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.concurrent.IoTDBDefaultThreadExceptionHandler;
@@ -65,6 +65,7 @@ import
org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo;
import org.apache.iotdb.confignode.rpc.thrift.TRuntimeConfiguration;
import org.apache.iotdb.confignode.rpc.thrift.TSystemConfigurationResp;
import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.db.conf.DataNodeStartupCheck;
import org.apache.iotdb.db.conf.DataNodeSystemPropertiesHandler;
import org.apache.iotdb.db.conf.IoTDBConfig;
@@ -126,6 +127,7 @@ import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -560,11 +562,15 @@ public class DataNode extends ServerCommandLine
implements DataNodeMBean {
}
}
- private void removeInvalidRegions(List<ConsensusGroupId>
dataNodeConsensusGroupIds) {
- removeInvalidConsensusDataRegions(dataNodeConsensusGroupIds);
+ private void makeRegionsCorrect(List<TRegionReplicaSet> correctRegions) {
+ List<ConsensusGroupId> dataNodeConsensusGroupIds =
+ correctRegions.stream()
+ .map(TRegionReplicaSet::getRegionId)
+ .map(ConsensusGroupId.Factory::createFromTConsensusGroupId)
+ .collect(Collectors.toList());
removeInvalidDataRegions(dataNodeConsensusGroupIds);
- removeInvalidConsensusSchemaRegions(dataNodeConsensusGroupIds);
removeInvalidSchemaRegions(dataNodeConsensusGroupIds);
+ prepareToResetDataRegionPeerList(correctRegions);
}
private void removeInvalidDataRegions(List<ConsensusGroupId>
dataNodeConsensusGroupIds) {
@@ -581,24 +587,6 @@ public class DataNode extends ServerCommandLine implements
DataNodeMBean {
});
}
- private void removeInvalidConsensusDataRegions(List<ConsensusGroupId>
dataNodeConsensusGroupIds) {
- List<ConsensusGroupId> invalidDataRegionConsensusGroupIds =
-
DataRegionConsensusImpl.getInstance().getAllConsensusGroupIdsWithoutStarting().stream()
- .filter(consensusGroupId ->
!dataNodeConsensusGroupIds.contains(consensusGroupId))
- .collect(Collectors.toList());
- if (invalidDataRegionConsensusGroupIds.isEmpty()) {
- return;
- }
- logger.info("Remove invalid dataRegion directories... {}",
invalidDataRegionConsensusGroupIds);
- for (ConsensusGroupId consensusGroupId :
invalidDataRegionConsensusGroupIds) {
- File oldDir =
- new File(
- DataRegionConsensusImpl.getInstance()
- .getRegionDirFromConsensusGroupId(consensusGroupId));
- removeDir(oldDir);
- }
- }
-
private void removeInvalidSchemaRegions(List<ConsensusGroupId>
schemaConsensusGroupIds) {
Map<String, List<SchemaRegionId>> localSchemaRegionInfo =
SchemaEngine.getLocalSchemaRegionInfo();
@@ -622,26 +610,6 @@ public class DataNode extends ServerCommandLine implements
DataNodeMBean {
});
}
- private void removeInvalidConsensusSchemaRegions(
- List<ConsensusGroupId> dataNodeConsensusGroupIds) {
- List<ConsensusGroupId> invalidSchemaRegionConsensusGroupIds =
-
SchemaRegionConsensusImpl.getInstance().getAllConsensusGroupIdsWithoutStarting().stream()
- .filter(consensusGroupId ->
!dataNodeConsensusGroupIds.contains(consensusGroupId))
- .collect(Collectors.toList());
- if (invalidSchemaRegionConsensusGroupIds.isEmpty()) {
- return;
- }
- logger.info(
- "Remove invalid schemaRegion directories... {}",
invalidSchemaRegionConsensusGroupIds);
- for (ConsensusGroupId consensusGroupId :
invalidSchemaRegionConsensusGroupIds) {
- File oldDir =
- new File(
- SchemaRegionConsensusImpl.getInstance()
- .getRegionDirFromConsensusGroupId(consensusGroupId));
- removeDir(oldDir);
- }
- }
-
private void removeInvalidSchemaDir(String database, SchemaRegionId
schemaRegionId) {
String systemSchemaDir =
config.getSystemDir() + File.separator + database + File.separator +
schemaRegionId.getId();
@@ -657,6 +625,39 @@ public class DataNode extends ServerCommandLine implements
DataNodeMBean {
}
}
+ private void prepareToResetDataRegionPeerList(List<TRegionReplicaSet>
correctedRegions) {
+ Map<ConsensusGroupId, List<Peer>> correctPeerListForDataRegion = new
HashMap<>();
+ Map<ConsensusGroupId, List<Peer>> correctPeerListForSchemaRegion = new
HashMap<>();
+ for (TRegionReplicaSet regionReplicaSet : correctedRegions) {
+ ConsensusGroupId consensusGroupId =
+
ConsensusGroupId.Factory.createFromTConsensusGroupId(regionReplicaSet.regionId);
+ List<Peer> peerList = new ArrayList<>();
+ if (consensusGroupId.getType() == TConsensusGroupType.DataRegion) {
+ for (TDataNodeLocation dataNodeLocation :
regionReplicaSet.getDataNodeLocations()) {
+ peerList.add(
+ new Peer(
+ consensusGroupId,
+ dataNodeLocation.getDataNodeId(),
+ dataNodeLocation.getDataRegionConsensusEndPoint()));
+ }
+ correctPeerListForDataRegion.put(consensusGroupId, peerList);
+ } else if (consensusGroupId.getType() ==
TConsensusGroupType.SchemaRegion) {
+ for (TDataNodeLocation dataNodeLocation :
regionReplicaSet.getDataNodeLocations()) {
+ peerList.add(
+ new Peer(
+ consensusGroupId,
+ dataNodeLocation.getDataNodeId(),
+ dataNodeLocation.getSchemaRegionConsensusEndPoint()));
+ }
+ correctPeerListForSchemaRegion.put(consensusGroupId, peerList);
+ }
+ }
+ DataRegionConsensusImpl.getInstance()
+ .recordCorrectPeerListBeforeStarting(correctPeerListForDataRegion);
+ SchemaRegionConsensusImpl.getInstance()
+ .recordCorrectPeerListBeforeStarting(correctPeerListForSchemaRegion);
+ }
+
private void sendRestartRequestToConfigNode() throws StartupException {
logger.info("Sending restart request to ConfigNode-leader...");
long startTime = System.currentTimeMillis();
@@ -709,11 +710,7 @@ public class DataNode extends ServerCommandLine implements
DataNodeMBean {
config.getClusterName(),
(endTime - startTime));
- List<TConsensusGroupId> consensusGroupIds =
dataNodeRestartResp.getConsensusGroupIds();
- removeInvalidRegions(
- consensusGroupIds.stream()
- .map(ConsensusGroupId.Factory::createFromTConsensusGroupId)
- .collect(Collectors.toList()));
+ makeRegionsCorrect(dataNodeRestartResp.getCorrectConsensusGroups());
} else {
/* Throw exception when restart is rejected */
throw new StartupException(dataNodeRestartResp.getStatus().getMessage());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeRPCMessageConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeRPCMessageConstant.java
new file mode 100644
index 00000000000..7e9646e5a86
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeRPCMessageConstant.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.config.constant;
+
+public class PipeRPCMessageConstant {
+ // These two message are used in multi-modules such as pipe and IoTV2
+ public static final String PIPE_ALREADY_EXIST_MSG =
+ "the pipe with the same name has been created";
+ public static final String PIPE_NOT_EXIST_MSG = "the pipe does not exist";
+
+ private PipeRPCMessageConstant() {
+ throw new IllegalStateException("Utility class");
+ }
+}
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index 5f090003119..f2e6c140091 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -134,7 +134,7 @@ struct TDataNodeRestartResp {
1: required common.TSStatus status
2: required list<common.TConfigNodeLocation> configNodeList
3: optional TRuntimeConfiguration runtimeConfiguration
- 4: optional list<common.TConsensusGroupId> consensusGroupIds
+ 4: optional list<common.TRegionReplicaSet> correctConsensusGroups
}
struct TDataNodeRemoveReq {