This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new b622ace6410 [to dev/1.3] IoTConsensus no longer stores the peer list
locally on the DataNode (#14900)
b622ace6410 is described below
commit b622ace641000303a62fb8911ffc56538c9aece2
Author: Li Yu Heng <[email protected]>
AuthorDate: Fri Feb 21 17:29:46 2025 +0800
[to dev/1.3] IoTConsensus no longer stores the peer list locally on the
DataNode (#14900)
* IoTConsensus and IoTConsensusV2 no longer stores the peer list locally on
the DataNode (#14814)
Co-authored-by: Peng Junzhi <[email protected]>
Co-authored-by: Xiangpeng Hu <[email protected]>
(cherry picked from commit 4bd2f259c50f228913adb7dc61e8321c42bdf43d)
* Fix IoTConsensus safe deleted index (#14883)
* done
* move
(cherry picked from commit 9e973b751cfa4a9bb471f8fdee7100f329f8c27b)
* Fix IoTConsensus safe deleted index again (#14897)
* ?
* spotless
(cherry picked from commit 4aadc09fb200af5d10382c507413eb69b4447710)
* revert iotv2
* fix IT
---
...IoTDBRegionOperationReliabilityITFramework.java | 72 ------
.../org/apache/iotdb/consensus/common/Peer.java | 13 +-
.../apache/iotdb/consensus/iot/IoTConsensus.java | 30 ++-
.../consensus/iot/IoTConsensusServerImpl.java | 262 +++------------------
.../consensus/iot/logdispatcher/LogDispatcher.java | 9 +-
.../service/IoTConsensusRPCServiceProcessor.java | 2 +-
.../apache/iotdb/consensus/iot/ReplicateTest.java | 63 ++---
.../iotdb/commons/consensus/ConsensusGroupId.java | 7 +-
8 files changed, 100 insertions(+), 358 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java
index 76c42c378d3..0dccc669eeb 100644
---
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java
@@ -30,7 +30,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
import org.apache.iotdb.consensus.ConsensusFactory;
-import org.apache.iotdb.consensus.iot.IoTConsensusServerImpl;
import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.it.env.EnvFactory;
@@ -95,7 +94,6 @@ public class IoTDBRegionOperationReliabilityITFramework {
private static final String SHOW_DATANODES = "show datanodes";
private static final String COUNT_TIMESERIES = "select count(*) from
root.sg.**";
private static final String REGION_MIGRATE_COMMAND_FORMAT = "migrate region
%d from %d to %d";
- private static final String CONFIGURATION_FILE_NAME = "configuration.dat";
ExecutorService executorService =
IoTDBThreadPoolFactory.newCachedThreadPool("regionMigrateIT");
public static Consumer<KillPointContext> actionOfKillNode =
@@ -232,7 +230,6 @@ public class IoTDBRegionOperationReliabilityITFramework {
final int destDataNode =
selectDataNodeNotContainsRegion(allDataNodeId, regionMap,
selectedRegion);
checkRegionFileExist(originalDataNode);
- checkPeersExist(regionMap.get(selectedRegion), originalDataNode,
selectedRegion);
// set kill points
if (killNode == KillNode.ORIGINAL_DATANODE) {
@@ -300,12 +297,10 @@ public class IoTDBRegionOperationReliabilityITFramework {
if (success) {
checkRegionFileClearIfNodeAlive(originalDataNode);
checkRegionFileExistIfNodeAlive(destDataNode);
- checkPeersClearIfNodeAlive(allDataNodeId, originalDataNode,
selectedRegion);
checkClusterStillWritable();
} else {
checkRegionFileClearIfNodeAlive(destDataNode);
checkRegionFileExistIfNodeAlive(originalDataNode);
- checkPeersClearIfNodeAlive(allDataNodeId, destDataNode,
selectedRegion);
}
LOGGER.info("test pass");
@@ -623,63 +618,6 @@ public class IoTDBRegionOperationReliabilityITFramework {
LOGGER.info("Original DataNode {} region file clear", dataNode);
}
- private static void checkPeersExistIfNodeAlive(
- Set<Integer> dataNodes, int originalDataNode, int regionId) {
- dataNodes.forEach(
- targetDataNode -> checkPeerExistIfNodeAlive(targetDataNode,
originalDataNode, regionId));
- }
-
- private static void checkPeersExist(Set<Integer> dataNodes, int
originalDataNode, int regionId) {
- dataNodes.forEach(targetDataNode -> checkPeerExist(targetDataNode,
originalDataNode, regionId));
- }
-
- private static void checkPeerExistIfNodeAlive(
- int checkTargetDataNode, int originalDataNode, int regionId) {
- if
(EnvFactory.getEnv().dataNodeIdToWrapper(checkTargetDataNode).get().isAlive()) {
- checkPeerExist(checkTargetDataNode, originalDataNode, regionId);
- }
- }
-
- private static void checkPeerExist(int checkTargetDataNode, int
originalDataNode, int regionId) {
- File expectExistedFile =
- new File(buildConfigurationDataFilePath(checkTargetDataNode,
originalDataNode, regionId));
- Assert.assertTrue(
- "configuration file should exist, but it didn't: " +
expectExistedFile.getPath(),
- expectExistedFile.exists());
- }
-
- private static void checkPeersClearIfNodeAlive(
- Set<Integer> dataNodes, int originalDataNode, int regionId) {
- dataNodes.stream()
- .filter(dataNode -> dataNode != originalDataNode)
- .forEach(
- targetDataNode ->
- checkPeerClearIfNodeAlive(targetDataNode, originalDataNode,
regionId));
- }
-
- private static void checkPeersClear(Set<Integer> dataNodes, int
originalDataNode, int regionId) {
- dataNodes.stream()
- .filter(dataNode -> dataNode != originalDataNode)
- .forEach(targetDataNode -> checkPeerClear(targetDataNode,
originalDataNode, regionId));
- LOGGER.info("Peer clear");
- }
-
- private static void checkPeerClearIfNodeAlive(
- int checkTargetDataNode, int originalDataNode, int regionId) {
- if
(EnvFactory.getEnv().dataNodeIdToWrapper(checkTargetDataNode).get().isAlive()) {
- checkPeerClear(checkTargetDataNode, originalDataNode, regionId);
- }
- }
-
- private static void checkPeerClear(int checkTargetDataNode, int
originalDataNode, int regionId) {
- File expectDeletedFile =
- new File(buildConfigurationDataFilePath(checkTargetDataNode,
originalDataNode, regionId));
- Assert.assertFalse(
- "configuration file should be deleted, but it didn't: " +
expectDeletedFile.getPath(),
- expectDeletedFile.exists());
- LOGGER.info("configuration file has been deleted: {}",
expectDeletedFile.getPath());
- }
-
private void checkClusterStillWritable() {
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
@@ -728,16 +666,6 @@ public class IoTDBRegionOperationReliabilityITFramework {
+ (isSequence ? IoTDBConstant.SEQUENCE_FOLDER_NAME :
IoTDBConstant.UNSEQUENCE_FOLDER_NAME);
}
- private static String buildConfigurationDataFilePath(
- int localDataNodeId, int remoteDataNodeId, int regionId) {
- String configurationDatDirName =
- buildRegionDirPath(localDataNodeId) + File.separator + "1_" + regionId;
- String expectDeletedFileName =
- IoTConsensusServerImpl.generateConfigurationDatFileName(
- remoteDataNodeId, CONFIGURATION_FILE_NAME);
- return configurationDatDirName + File.separator + expectDeletedFileName;
- }
-
protected static KeySetView<String, Boolean> noKillPoints() {
return ConcurrentHashMap.newKeySet();
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java
index e60efc6565f..bc6ec923aaf 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java
@@ -35,14 +35,15 @@ import org.slf4j.LoggerFactory;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Comparator;
import java.util.Objects;
-public class Peer {
+public class Peer implements Comparable<Peer> {
private final Logger logger = LoggerFactory.getLogger(Peer.class);
private final ConsensusGroupId groupId;
- private final TEndPoint endpoint;
private final int nodeId;
+ private final TEndPoint endpoint;
public Peer(ConsensusGroupId groupId, int nodeId, TEndPoint endpoint) {
this.groupId = groupId;
@@ -105,6 +106,14 @@ public class Peer {
return "Peer{" + "groupId=" + groupId + ", endpoint=" + endpoint + ",
nodeId=" + nodeId + '}';
}
+ @Override
+ public int compareTo(Peer peer) {
+ return Comparator.comparing(Peer::getGroupId)
+ .thenComparingInt(Peer::getNodeId)
+ .thenComparing(Peer::getEndpoint)
+ .compare(this, peer);
+ }
+
public static Peer valueOf(
TConsensusGroupId consensusGroupId, TDataNodeLocation dataNodeLocation) {
if (consensusGroupId.getType() == TConsensusGroupType.SchemaRegion) {
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index 7f41f4bd582..e3a356bdb1b 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -75,6 +75,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
@@ -174,7 +175,7 @@ public class IoTConsensus implements IConsensus {
new IoTConsensusServerImpl(
path.toString(),
new Peer(consensusGroupId, thisNodeId, thisNode),
- new ArrayList<>(),
+ new TreeSet<>(),
registry.apply(consensusGroupId),
backgroundTaskService,
clientManager,
@@ -188,7 +189,7 @@ public class IoTConsensus implements IConsensus {
BiConsumer<ConsensusGroupId, List<Peer>> resetPeerListWithoutThrow =
(consensusGroupId, peers) -> {
try {
- resetPeerList(consensusGroupId, peers);
+ resetPeerListImpl(consensusGroupId, peers, false);
} catch (ConsensusGroupNotExistException ignore) {
} catch (Exception e) {
@@ -281,7 +282,7 @@ public class IoTConsensus implements IConsensus {
new IoTConsensusServerImpl(
path,
new Peer(groupId, thisNodeId, thisNode),
- peers,
+ new TreeSet<>(peers),
registry.apply(groupId),
backgroundTaskService,
clientManager,
@@ -490,6 +491,12 @@ public class IoTConsensus implements IConsensus {
@Override
public void resetPeerList(ConsensusGroupId groupId, List<Peer> correctPeers)
throws ConsensusException {
+ resetPeerListImpl(groupId, correctPeers, true);
+ }
+
+ private void resetPeerListImpl(
+ ConsensusGroupId groupId, List<Peer> correctPeers, boolean startNow)
+ throws ConsensusException {
IoTConsensusServerImpl impl =
Optional.ofNullable(stateMachineMap.get(groupId))
.orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
@@ -497,7 +504,7 @@ public class IoTConsensus implements IConsensus {
Peer localPeer = new Peer(groupId, thisNodeId, thisNode);
if (!correctPeers.contains(localPeer)) {
logger.info(
- "[RESET PEER LIST] Local peer is not in the correct configuration,
delete local peer {}",
+ "[RESET PEER LIST] {} Local peer is not in the correct
configuration, delete it.",
groupId);
deleteLocalPeer(groupId);
return;
@@ -510,29 +517,32 @@ public class IoTConsensus implements IConsensus {
for (Peer peer : currentMembers) {
if (!correctPeers.contains(peer)) {
if (!impl.removeSyncLogChannel(peer)) {
- logger.error("[RESET PEER LIST] Failed to remove sync channel
with: {}", peer);
+ logger.error(
+ "[RESET PEER LIST] {} Failed to remove sync channel with: {}",
groupId, peer);
} else {
- logger.info("[RESET PEER LIST] Remove sync channel with: {}",
peer);
+ logger.info("[RESET PEER LIST] {} Remove sync channel with: {}",
groupId, peer);
}
}
}
// add correct peer
for (Peer peer : correctPeers) {
if (!impl.getConfiguration().contains(peer)) {
- impl.buildSyncLogChannel(peer);
- logger.info("[RESET PEER LIST] Build sync channel with: {}", peer);
+ impl.buildSyncLogChannel(peer, startNow);
+ logger.info("[RESET PEER LIST] {} Build sync channel with: {}",
groupId, peer);
}
}
// show result
String newPeerListStr = impl.getConfiguration().toString();
if (!previousPeerListStr.equals(newPeerListStr)) {
logger.info(
- "[RESET PEER LIST] Local peer list has been reset: {} -> {}",
+ "[RESET PEER LIST] {} Local peer list has been reset: {} -> {}",
+ groupId,
previousPeerListStr,
newPeerListStr);
} else {
logger.info(
- "[RESET PEER LIST] The current peer list is correct, nothing need
to be reset: {}",
+ "[RESET PEER LIST] {} The current peer list is correct, nothing
need to be reset: {}",
+ groupId,
previousPeerListStr);
}
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index ee049ee4d9c..a13a19dde8d 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -73,11 +73,9 @@ import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
@@ -85,12 +83,11 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.Objects;
import java.util.PriorityQueue;
-import java.util.Set;
+import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
@@ -100,15 +97,11 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
import static
org.apache.iotdb.commons.utils.FileUtils.humanReadableByteCountSI;
public class IoTConsensusServerImpl {
- private static final String CONFIGURATION_FILE_NAME = "configuration.dat";
- private static final String CONFIGURATION_TMP_FILE_NAME =
"configuration.dat.tmp";
public static final String SNAPSHOT_DIR_NAME = "snapshot";
private static final Pattern SNAPSHOT_INDEX_PATTEN =
Pattern.compile(".*[^\\d](?=(\\d+))");
private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS
=
@@ -120,7 +113,7 @@ public class IoTConsensusServerImpl {
private final Lock stateMachineLock = new ReentrantLock();
private final Condition stateMachineCondition =
stateMachineLock.newCondition();
private final String storageDir;
- private final List<Peer> configuration;
+ private final TreeSet<Peer> configuration;
private final AtomicLong searchIndex;
private final LogDispatcher logDispatcher;
private IoTConsensusConfig config;
@@ -137,7 +130,7 @@ public class IoTConsensusServerImpl {
public IoTConsensusServerImpl(
String storageDir,
Peer thisNode,
- List<Peer> configuration,
+ TreeSet<Peer> configuration,
IStateMachine stateMachine,
ScheduledExecutorService backgroundTaskService,
IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager,
@@ -150,25 +143,14 @@ public class IoTConsensusServerImpl {
this.cacheQueueMap = new ConcurrentHashMap<>();
this.syncClientManager = syncClientManager;
this.configuration = configuration;
- if (configuration.isEmpty()) {
- recoverConfiguration();
- } else {
- persistConfiguration();
- }
this.backgroundTaskService = backgroundTaskService;
this.config = config;
this.consensusGroupId = thisNode.getGroupId().toString();
- consensusReqReader = (ConsensusReqReader) stateMachine.read(new
GetConsensusReqReaderPlan());
+ this.consensusReqReader =
+ (ConsensusReqReader) stateMachine.read(new
GetConsensusReqReaderPlan());
this.searchIndex = new
AtomicLong(consensusReqReader.getCurrentSearchIndex());
this.ioTConsensusServerMetrics = new IoTConsensusServerMetrics(this);
this.logDispatcher = new LogDispatcher(this, clientManager);
- // Since the underlying wal does not persist safelyDeletedSearchIndex,
IoTConsensus needs to
- // update wal with its syncIndex recovered from the consensus layer when
initializing.
- // This prevents wal from being piled up if the safelyDeletedSearchIndex
is not updated after
- // the restart and Leader migration occurs
- checkAndUpdateSafeDeletedSearchIndex();
- // see message in logs for details
- checkAndUpdateSearchIndex();
}
public IStateMachine getStateMachine() {
@@ -176,6 +158,7 @@ public class IoTConsensusServerImpl {
}
public void start() {
+ checkAndUpdateIndex();
MetricService.getInstance().addMetricSet(this.ioTConsensusServerMetrics);
stateMachine.start();
logDispatcher.start();
@@ -498,7 +481,7 @@ public class IoTConsensusServerImpl {
if (peer.equals(thisNode)) {
// use searchIndex for thisNode as the initialSyncIndex because
targetPeer will load the
// snapshot produced by thisNode
- buildSyncLogChannel(targetPeer);
+ buildSyncLogChannel(targetPeer, true);
} else {
// use RPC to tell other peers to build sync log channel to target peer
try (SyncIoTConsensusServiceClient client =
@@ -646,23 +629,22 @@ public class IoTConsensusServerImpl {
}
/** build SyncLog channel with safeIndex as the default initial sync index.
*/
- public void buildSyncLogChannel(Peer targetPeer) {
- buildSyncLogChannel(targetPeer, getMinSyncIndex());
+ public void buildSyncLogChannel(Peer targetPeer, boolean startNow) {
+ buildSyncLogChannel(targetPeer, getMinSyncIndex(), startNow);
}
- public void buildSyncLogChannel(Peer targetPeer, long initialSyncIndex) {
+ public void buildSyncLogChannel(Peer targetPeer, long initialSyncIndex,
boolean startNow) {
KillPoint.setKillPoint(DataNodeKillPoints.ORIGINAL_ADD_PEER_DONE);
- // step 1, build sync channel in LogDispatcher
+ configuration.add(targetPeer);
+ if (Objects.equals(targetPeer, thisNode)) {
+ return;
+ }
+ logDispatcher.addLogDispatcherThread(targetPeer, initialSyncIndex,
startNow);
logger.info(
- "[IoTConsensus] build sync log channel to {} with initialSyncIndex {}",
+ "[IoTConsensus] Successfully build sync log channel to {} with
initialSyncIndex {}. {}",
targetPeer,
- initialSyncIndex);
- logDispatcher.addLogDispatcherThread(targetPeer, initialSyncIndex);
- // step 2, update configuration
- configuration.add(targetPeer);
- // step 3, persist configuration
- persistConfiguration();
- logger.info("[IoTConsensus Configuration] persist new configuration: {}",
configuration);
+ initialSyncIndex,
+ startNow ? "Sync log channel has started." : "Sync log channel maybe
start later.");
}
/**
@@ -689,8 +671,6 @@ public class IoTConsensusServerImpl {
// step 2, update configuration
configuration.remove(targetPeer);
checkAndUpdateSafeDeletedSearchIndex();
- // step 3, persist configuration
- persistConfiguration();
logger.info(
"[IoTConsensus Configuration] Configuration updated to {}. {}",
this.configuration,
@@ -698,103 +678,6 @@ public class IoTConsensusServerImpl {
return !exceptionHappened;
}
- public void persistConfiguration() {
- try {
- removeDuplicateConfiguration();
- renameTmpConfigurationFileToRemoveSuffix();
- serializeConfigurationAndFsyncToDisk();
- deleteConfiguration();
- renameTmpConfigurationFileToRemoveSuffix();
- } catch (IOException e) {
- // TODO: (xingtanzjr) need to handle the IOException because the
IoTConsensus won't
- // work expectedly
- // if the exception occurs
- logger.error("Unexpected error occurs when persisting configuration", e);
- }
- }
-
- public void recoverConfiguration() {
- try {
- Path tmpConfigurationPath =
- Paths.get(new File(storageDir,
CONFIGURATION_TMP_FILE_NAME).getAbsolutePath());
- Path configurationPath =
- Paths.get(new File(storageDir,
CONFIGURATION_FILE_NAME).getAbsolutePath());
- // If the tmpConfigurationPath exists, it means the
`persistConfigurationUpdate` is
- // interrupted
- // unexpectedly, we need substitute configuration with tmpConfiguration
file
- if (Files.exists(tmpConfigurationPath)) {
- Files.deleteIfExists(configurationPath);
- Files.move(tmpConfigurationPath, configurationPath);
- logger.info(
- "[IoTConsensus Configuration] recover configuration from
tmpConfigurationFile, {}",
- tmpConfigurationPath);
- }
- if (Files.exists(configurationPath)) {
- recoverFromOldConfigurationFile(configurationPath);
- logger.info(
- "[IoTConsensus Configuration] recover configuration from
oldConfigurationFile, {}",
- configurationPath);
- } else {
- // recover from split configuration file
- logger.info(
- "[IoTConsensus Configuration] recover configuration from old split
configuration file");
- Path dirPath = Paths.get(storageDir);
- List<Peer> tmpPeerList = getConfiguration(dirPath,
CONFIGURATION_TMP_FILE_NAME);
- configuration.addAll(tmpPeerList);
- logger.info(
- "[IoTConsensus Configuration] recover configuration from
tmpPeerList, {}",
- configuration);
- List<Peer> peerList = getConfiguration(dirPath,
CONFIGURATION_FILE_NAME);
- for (Peer peer : peerList) {
- if (!configuration.contains(peer)) {
- configuration.add(peer);
- }
- }
- logger.info(
- "[IoTConsensus Configuration] recover configuration from peerList,
{}", configuration);
- persistConfiguration();
- }
- logger.info("Recover IoTConsensus server Impl, configuration: {}",
configuration);
- } catch (IOException e) {
- logger.error("Unexpected error occurs when recovering configuration", e);
- }
- }
-
- // @Compatibility
- private void recoverFromOldConfigurationFile(Path oldConfigurationPath)
throws IOException {
- // recover from old configuration file
- ByteBuffer buffer =
ByteBuffer.wrap(Files.readAllBytes(oldConfigurationPath));
- int size = buffer.getInt();
- for (int i = 0; i < size; i++) {
- configuration.add(Peer.deserialize(buffer));
- }
- persistConfiguration();
- }
-
- public static String generateConfigurationDatFileName(int nodeId, String
suffix) {
- return nodeId + "_" + suffix;
- }
-
- private List<Peer> getConfiguration(Path dirPath, String
configurationFileName)
- throws IOException {
- ByteBuffer buffer;
- List<Peer> tmpConfiguration = new ArrayList<>();
- Path[] files =
- Files.walk(dirPath)
- .filter(Files::isRegularFile)
- .filter(filePath ->
filePath.getFileName().toString().contains(configurationFileName))
- .toArray(Path[]::new);
- logger.info(
- "[IoTConsensus Configuration] getConfiguration: fileName, {},
fileList: {}",
- configurationFileName,
- files);
- for (Path file : files) {
- buffer = ByteBuffer.wrap(Files.readAllBytes(file));
- tmpConfiguration.add(Peer.deserialize(buffer));
- }
- return tmpConfiguration;
- }
-
public IndexedConsensusRequest buildIndexedConsensusRequestForLocalRequest(
IConsensusRequest request) {
if (request instanceof ComparableConsensusRequest) {
@@ -832,7 +715,7 @@ public class IoTConsensusServerImpl {
}
public List<Peer> getConfiguration() {
- return configuration;
+ return new ArrayList<>(configuration);
}
public long getSearchIndex() {
@@ -943,12 +826,25 @@ public class IoTConsensusServerImpl {
}
}
+ void checkAndUpdateIndex() {
+ // Since the underlying wal does not persist safelyDeletedSearchIndex,
IoTConsensus needs to
+ // update wal with its syncIndex recovered from the consensus layer when
initializing.
+ // This prevents wal from being piled up if the safelyDeletedSearchIndex
is not updated after
+ // the restart and Leader migration occurs
+ checkAndUpdateSafeDeletedSearchIndex();
+ // see message in logs for details
+ checkAndUpdateSearchIndex();
+ }
+
/**
- * If there is only one replica, set it to Long.MAX_VALUE.、 If there are
multiple replicas, get
- * the latest SafelyDeletedSearchIndex again. This enables wal to be deleted
in a timely manner.
+ * If there is only one replica, set it to Long.MAX_VALUE. If there are
multiple replicas, get the
+ * latest SafelyDeletedSearchIndex again. This enables wal to be deleted in
a timely manner.
*/
- public void checkAndUpdateSafeDeletedSearchIndex() {
- if (configuration.size() == 1) {
+ void checkAndUpdateSafeDeletedSearchIndex() {
+ if (configuration.isEmpty()) {
+ logger.error(
+ "Configuration is empty, which is unexpected. Safe deleted search
index won't be updated this time.");
+ } else if (configuration.size() == 1) {
consensusReqReader.setSafelyDeletedSearchIndex(Long.MAX_VALUE);
} else {
consensusReqReader.setSafelyDeletedSearchIndex(getMinFlushedSyncIndex());
@@ -984,90 +880,6 @@ public class IoTConsensusServerImpl {
return consensusGroupId;
}
- private void serializeConfigurationAndFsyncToDisk() throws IOException {
- for (Peer peer : configuration) {
- String peerConfigurationFileName =
- generateConfigurationDatFileName(peer.getNodeId(),
CONFIGURATION_TMP_FILE_NAME);
- FileOutputStream fileOutputStream =
- new FileOutputStream(new File(storageDir,
peerConfigurationFileName));
- try (DataOutputStream outputStream = new
DataOutputStream(fileOutputStream)) {
- peer.serialize(outputStream);
- } finally {
- try {
- fileOutputStream.flush();
- fileOutputStream.getFD().sync();
- } catch (IOException ignore) {
- // ignore sync exception
- }
- }
- logger.info("[IoTConsensus Configuration] serializeConfiguration: {}",
peer);
- }
- }
-
- private void renameTmpConfigurationFileToRemoveSuffix() throws IOException {
- try (Stream<Path> stream = Files.list(Paths.get(storageDir))) {
- List<Path> paths =
- stream
- .filter(Files::isRegularFile)
- .filter(
- filePath ->
-
filePath.getFileName().toString().endsWith(CONFIGURATION_TMP_FILE_NAME))
- .collect(Collectors.toList());
- for (Path filePath : paths) {
- String targetPath =
- filePath.toString().replace(CONFIGURATION_TMP_FILE_NAME,
CONFIGURATION_FILE_NAME);
- File targetFile = new File(targetPath);
- if (targetFile.exists()) {
- try {
- Files.delete(targetFile.toPath());
- } catch (IOException e) {
- logger.error("Unexpected error occurs when delete file: {}",
targetPath, e);
- }
- }
- if (!filePath.toFile().renameTo(targetFile)) {
- logger.error("Unexpected error occurs when rename file: {} -> {}",
filePath, targetPath);
- }
- logger.info("[IoTConsensus Configuration] renameTmpConfigurationFile:
{}", targetPath);
- }
- } catch (UncheckedIOException e) {
- throw e.getCause();
- }
- }
-
- private void deleteConfiguration() throws IOException {
- try (Stream<Path> stream = Files.list(Paths.get(storageDir))) {
- stream
- .filter(Files::isRegularFile)
- .filter(filePath ->
filePath.getFileName().toString().endsWith(CONFIGURATION_FILE_NAME))
- .forEach(
- filePath -> {
- try {
- Files.delete(filePath);
- } catch (IOException e) {
- logger.error(
- "Unexpected error occurs when deleting old configuration
file {}",
- filePath,
- e);
- }
- logger.info("[IoTConsensus Configuration] deleteConfiguration:
{}", filePath);
- });
- } catch (UncheckedIOException e) {
- throw e.getCause();
- }
- }
-
- public void removeDuplicateConfiguration() {
- Set<Peer> seen = new HashSet<>();
- Iterator<Peer> it = configuration.iterator();
-
- while (it.hasNext()) {
- Peer peer = it.next();
- if (!seen.add(peer)) {
- it.remove();
- }
- }
- }
-
/** This method is used for hot reload of IoTConsensusConfig. */
public void reloadConsensusConfig(IoTConsensusConfig config) {
this.config = config;
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index 6b33fcc5dfc..6f67bc70c86 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -98,7 +98,7 @@ public class LogDispatcher {
public synchronized void start() {
if (!threads.isEmpty()) {
- threads.forEach(executorService::submit);
+ threads.forEach(logDispatcherThread ->
executorService.submit(logDispatcherThread));
}
}
@@ -120,7 +120,8 @@ public class LogDispatcher {
stopped = true;
}
- public synchronized void addLogDispatcherThread(Peer peer, long
initialSyncIndex) {
+ public synchronized void addLogDispatcherThread(
+ Peer peer, long initialSyncIndex, boolean startNow) {
if (stopped) {
return;
}
@@ -131,7 +132,9 @@ public class LogDispatcher {
if (this.executorService == null) {
initLogSyncThreadPool();
}
- executorService.submit(thread);
+ if (startNow) {
+ executorService.submit(thread);
+ }
}
public synchronized void removeLogDispatcherThread(Peer peer) throws
IOException {
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
index f6a67c3e8c6..2bac66738fd 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
@@ -189,7 +189,7 @@ public class IoTConsensusRPCServiceProcessor implements
IoTConsensusIService.Ifa
return new TBuildSyncLogChannelRes(status);
}
TSStatus responseStatus;
- impl.buildSyncLogChannel(new Peer(groupId, req.nodeId, req.endPoint));
+ impl.buildSyncLogChannel(new Peer(groupId, req.nodeId, req.endPoint),
true);
responseStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
return new TBuildSyncLogChannelRes(responseStatus);
}
diff --git
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
index 3ec7769f2ac..f22d3fdadcd 100644
---
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
+++
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
@@ -46,8 +46,8 @@ import java.io.IOException;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class ReplicateTest {
@@ -57,12 +57,6 @@ public class ReplicateTest {
private final ConsensusGroupId gid = new DataRegionId(1);
- private static final long timeout = TimeUnit.SECONDS.toMillis(300);
-
- private static final String CONFIGURATION_FILE_NAME = "configuration.dat";
-
- private static final String CONFIGURATION_TMP_FILE_NAME =
"configuration.dat.tmp";
-
private int basePort = 9000;
private final List<Peer> peers =
@@ -73,9 +67,9 @@ public class ReplicateTest {
private final List<File> peersStorage =
Arrays.asList(
- new File("target" + java.io.File.separator + "1"),
- new File("target" + java.io.File.separator + "2"),
- new File("target" + java.io.File.separator + "3"));
+ new File("target" + File.separator + "1"),
+ new File("target" + File.separator + "2"),
+ new File("target" + File.separator + "3"));
private final ConsensusGroup group = new ConsensusGroup(gid, peers);
private final List<IoTConsensus> servers = new ArrayList<>();
@@ -120,6 +114,9 @@ public class ReplicateTest {
String.format(
ConsensusFactory.CONSTRUCT_FAILED_MSG,
ConsensusFactory.IOT_CONSENSUS))));
+
servers.get(i).recordCorrectPeerListBeforeStarting(Collections.singletonMap(gid,
peers));
+ }
+ for (int i = 0; i < peers.size(); i++) {
servers.get(i).start();
}
} catch (IOException e) {
@@ -187,23 +184,9 @@ public class ReplicateTest {
stopServer();
initServer();
- Assert.assertEquals(
- peers.stream().map(Peer::getNodeId).collect(Collectors.toSet()),
- servers.get(0).getImpl(gid).getConfiguration().stream()
- .map(Peer::getNodeId)
- .collect(Collectors.toSet()));
-
- Assert.assertEquals(
- peers.stream().map(Peer::getNodeId).collect(Collectors.toSet()),
- servers.get(1).getImpl(gid).getConfiguration().stream()
- .map(Peer::getNodeId)
- .collect(Collectors.toSet()));
-
- Assert.assertEquals(
- peers.stream().map(Peer::getNodeId).collect(Collectors.toSet()),
- servers.get(2).getImpl(gid).getConfiguration().stream()
- .map(Peer::getNodeId)
- .collect(Collectors.toSet()));
+ checkPeerList(servers.get(0).getImpl(gid));
+ checkPeerList(servers.get(1).getImpl(gid));
+ checkPeerList(servers.get(2).getImpl(gid));
Assert.assertEquals(CHECK_POINT_GAP,
servers.get(0).getImpl(gid).getSearchIndex());
Assert.assertEquals(CHECK_POINT_GAP,
servers.get(1).getImpl(gid).getSearchIndex());
@@ -264,23 +247,9 @@ public class ReplicateTest {
initServer();
servers.get(2).createLocalPeer(group.getGroupId(), group.getPeers());
- Assert.assertEquals(
- peers.stream().map(Peer::getNodeId).collect(Collectors.toSet()),
- servers.get(0).getImpl(gid).getConfiguration().stream()
- .map(Peer::getNodeId)
- .collect(Collectors.toSet()));
-
- Assert.assertEquals(
- peers.stream().map(Peer::getNodeId).collect(Collectors.toSet()),
- servers.get(1).getImpl(gid).getConfiguration().stream()
- .map(Peer::getNodeId)
- .collect(Collectors.toSet()));
-
- Assert.assertEquals(
- peers.stream().map(Peer::getNodeId).collect(Collectors.toSet()),
- servers.get(2).getImpl(gid).getConfiguration().stream()
- .map(Peer::getNodeId)
- .collect(Collectors.toSet()));
+ checkPeerList(servers.get(0).getImpl(gid));
+ checkPeerList(servers.get(1).getImpl(gid));
+ checkPeerList(servers.get(2).getImpl(gid));
Assert.assertEquals(CHECK_POINT_GAP,
servers.get(0).getImpl(gid).getSearchIndex());
Assert.assertEquals(CHECK_POINT_GAP,
servers.get(1).getImpl(gid).getSearchIndex());
@@ -345,4 +314,10 @@ public class ReplicateTest {
}
return true;
}
+
+ private void checkPeerList(IoTConsensusServerImpl iotServerImpl) {
+ Assert.assertEquals(
+ peers.stream().map(Peer::getNodeId).collect(Collectors.toSet()),
+
iotServerImpl.getConfiguration().stream().map(Peer::getNodeId).collect(Collectors.toSet()));
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
index 6a771007b3a..f3fe432ae7e 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import java.util.Objects;
/** We abstract this class to hide word `ConsensusGroup` for IoTDB
StorageEngine/SchemaEngine. */
-public abstract class ConsensusGroupId {
+public abstract class ConsensusGroupId implements Comparable<ConsensusGroupId>
{
protected int id;
@@ -126,4 +126,9 @@ public abstract class ConsensusGroupId {
return create(tConsensusGroupId.getType().getValue(),
tConsensusGroupId.getId());
}
}
+
+ @Override
+ public int compareTo(ConsensusGroupId o) {
+ return Integer.compare(id, o.id);
+ }
}