This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new b622ace6410 [to dev/1.3] IoTConsensus no longer stores the peer list 
locally on the DataNode (#14900)
b622ace6410 is described below

commit b622ace641000303a62fb8911ffc56538c9aece2
Author: Li Yu Heng <[email protected]>
AuthorDate: Fri Feb 21 17:29:46 2025 +0800

    [to dev/1.3] IoTConsensus no longer stores the peer list locally on the 
DataNode (#14900)
    
    * IoTConsensus and IoTConsensusV2 no longer stores the peer list locally on 
the DataNode (#14814)
    
    Co-authored-by: Peng Junzhi <[email protected]>
    
    Co-authored-by: Xiangpeng Hu <[email protected]>
    
    (cherry picked from commit 4bd2f259c50f228913adb7dc61e8321c42bdf43d)
    
    * Fix IoTConsensus safe deleted index (#14883)
    
    * done
    
    * move
    
    (cherry picked from commit 9e973b751cfa4a9bb471f8fdee7100f329f8c27b)
    
    * Fix IoTConsensus safe deleted index again (#14897)
    
    * ?
    
    * spotless
    
    (cherry picked from commit 4aadc09fb200af5d10382c507413eb69b4447710)
    
    * revert iotv2
    
    * fix IT
---
 ...IoTDBRegionOperationReliabilityITFramework.java |  72 ------
 .../org/apache/iotdb/consensus/common/Peer.java    |  13 +-
 .../apache/iotdb/consensus/iot/IoTConsensus.java   |  30 ++-
 .../consensus/iot/IoTConsensusServerImpl.java      | 262 +++------------------
 .../consensus/iot/logdispatcher/LogDispatcher.java |   9 +-
 .../service/IoTConsensusRPCServiceProcessor.java   |   2 +-
 .../apache/iotdb/consensus/iot/ReplicateTest.java  |  63 ++---
 .../iotdb/commons/consensus/ConsensusGroupId.java  |   7 +-
 8 files changed, 100 insertions(+), 358 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java
 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java
index 76c42c378d3..0dccc669eeb 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java
@@ -30,7 +30,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
 import org.apache.iotdb.consensus.ConsensusFactory;
-import org.apache.iotdb.consensus.iot.IoTConsensusServerImpl;
 import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
 import org.apache.iotdb.isession.SessionDataSet;
 import org.apache.iotdb.it.env.EnvFactory;
@@ -95,7 +94,6 @@ public class IoTDBRegionOperationReliabilityITFramework {
   private static final String SHOW_DATANODES = "show datanodes";
   private static final String COUNT_TIMESERIES = "select count(*) from 
root.sg.**";
   private static final String REGION_MIGRATE_COMMAND_FORMAT = "migrate region 
%d from %d to %d";
-  private static final String CONFIGURATION_FILE_NAME = "configuration.dat";
   ExecutorService executorService = 
IoTDBThreadPoolFactory.newCachedThreadPool("regionMigrateIT");
 
   public static Consumer<KillPointContext> actionOfKillNode =
@@ -232,7 +230,6 @@ public class IoTDBRegionOperationReliabilityITFramework {
       final int destDataNode =
           selectDataNodeNotContainsRegion(allDataNodeId, regionMap, 
selectedRegion);
       checkRegionFileExist(originalDataNode);
-      checkPeersExist(regionMap.get(selectedRegion), originalDataNode, 
selectedRegion);
 
       // set kill points
       if (killNode == KillNode.ORIGINAL_DATANODE) {
@@ -300,12 +297,10 @@ public class IoTDBRegionOperationReliabilityITFramework {
       if (success) {
         checkRegionFileClearIfNodeAlive(originalDataNode);
         checkRegionFileExistIfNodeAlive(destDataNode);
-        checkPeersClearIfNodeAlive(allDataNodeId, originalDataNode, 
selectedRegion);
         checkClusterStillWritable();
       } else {
         checkRegionFileClearIfNodeAlive(destDataNode);
         checkRegionFileExistIfNodeAlive(originalDataNode);
-        checkPeersClearIfNodeAlive(allDataNodeId, destDataNode, 
selectedRegion);
       }
 
       LOGGER.info("test pass");
@@ -623,63 +618,6 @@ public class IoTDBRegionOperationReliabilityITFramework {
     LOGGER.info("Original DataNode {} region file clear", dataNode);
   }
 
-  private static void checkPeersExistIfNodeAlive(
-      Set<Integer> dataNodes, int originalDataNode, int regionId) {
-    dataNodes.forEach(
-        targetDataNode -> checkPeerExistIfNodeAlive(targetDataNode, 
originalDataNode, regionId));
-  }
-
-  private static void checkPeersExist(Set<Integer> dataNodes, int 
originalDataNode, int regionId) {
-    dataNodes.forEach(targetDataNode -> checkPeerExist(targetDataNode, 
originalDataNode, regionId));
-  }
-
-  private static void checkPeerExistIfNodeAlive(
-      int checkTargetDataNode, int originalDataNode, int regionId) {
-    if 
(EnvFactory.getEnv().dataNodeIdToWrapper(checkTargetDataNode).get().isAlive()) {
-      checkPeerExist(checkTargetDataNode, originalDataNode, regionId);
-    }
-  }
-
-  private static void checkPeerExist(int checkTargetDataNode, int 
originalDataNode, int regionId) {
-    File expectExistedFile =
-        new File(buildConfigurationDataFilePath(checkTargetDataNode, 
originalDataNode, regionId));
-    Assert.assertTrue(
-        "configuration file should exist, but it didn't: " + 
expectExistedFile.getPath(),
-        expectExistedFile.exists());
-  }
-
-  private static void checkPeersClearIfNodeAlive(
-      Set<Integer> dataNodes, int originalDataNode, int regionId) {
-    dataNodes.stream()
-        .filter(dataNode -> dataNode != originalDataNode)
-        .forEach(
-            targetDataNode ->
-                checkPeerClearIfNodeAlive(targetDataNode, originalDataNode, 
regionId));
-  }
-
-  private static void checkPeersClear(Set<Integer> dataNodes, int 
originalDataNode, int regionId) {
-    dataNodes.stream()
-        .filter(dataNode -> dataNode != originalDataNode)
-        .forEach(targetDataNode -> checkPeerClear(targetDataNode, 
originalDataNode, regionId));
-    LOGGER.info("Peer clear");
-  }
-
-  private static void checkPeerClearIfNodeAlive(
-      int checkTargetDataNode, int originalDataNode, int regionId) {
-    if 
(EnvFactory.getEnv().dataNodeIdToWrapper(checkTargetDataNode).get().isAlive()) {
-      checkPeerClear(checkTargetDataNode, originalDataNode, regionId);
-    }
-  }
-
-  private static void checkPeerClear(int checkTargetDataNode, int 
originalDataNode, int regionId) {
-    File expectDeletedFile =
-        new File(buildConfigurationDataFilePath(checkTargetDataNode, 
originalDataNode, regionId));
-    Assert.assertFalse(
-        "configuration file should be deleted, but it didn't: " + 
expectDeletedFile.getPath(),
-        expectDeletedFile.exists());
-    LOGGER.info("configuration file has been deleted: {}", 
expectDeletedFile.getPath());
-  }
-
   private void checkClusterStillWritable() {
     try (Connection connection = EnvFactory.getEnv().getConnection();
         Statement statement = connection.createStatement()) {
@@ -728,16 +666,6 @@ public class IoTDBRegionOperationReliabilityITFramework {
         + (isSequence ? IoTDBConstant.SEQUENCE_FOLDER_NAME : 
IoTDBConstant.UNSEQUENCE_FOLDER_NAME);
   }
 
-  private static String buildConfigurationDataFilePath(
-      int localDataNodeId, int remoteDataNodeId, int regionId) {
-    String configurationDatDirName =
-        buildRegionDirPath(localDataNodeId) + File.separator + "1_" + regionId;
-    String expectDeletedFileName =
-        IoTConsensusServerImpl.generateConfigurationDatFileName(
-            remoteDataNodeId, CONFIGURATION_FILE_NAME);
-    return configurationDatDirName + File.separator + expectDeletedFileName;
-  }
-
   protected static KeySetView<String, Boolean> noKillPoints() {
     return ConcurrentHashMap.newKeySet();
   }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java
index e60efc6565f..bc6ec923aaf 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java
@@ -35,14 +35,15 @@ import org.slf4j.LoggerFactory;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Comparator;
 import java.util.Objects;
 
-public class Peer {
+public class Peer implements Comparable<Peer> {
 
   private final Logger logger = LoggerFactory.getLogger(Peer.class);
   private final ConsensusGroupId groupId;
-  private final TEndPoint endpoint;
   private final int nodeId;
+  private final TEndPoint endpoint;
 
   public Peer(ConsensusGroupId groupId, int nodeId, TEndPoint endpoint) {
     this.groupId = groupId;
@@ -105,6 +106,14 @@ public class Peer {
     return "Peer{" + "groupId=" + groupId + ", endpoint=" + endpoint + ", 
nodeId=" + nodeId + '}';
   }
 
+  @Override
+  public int compareTo(Peer peer) {
+    return Comparator.comparing(Peer::getGroupId)
+        .thenComparingInt(Peer::getNodeId)
+        .thenComparing(Peer::getEndpoint)
+        .compare(this, peer);
+  }
+
   public static Peer valueOf(
       TConsensusGroupId consensusGroupId, TDataNodeLocation dataNodeLocation) {
     if (consensusGroupId.getType() == TConsensusGroupType.SchemaRegion) {
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index 7f41f4bd582..e3a356bdb1b 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -75,6 +75,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
@@ -174,7 +175,7 @@ public class IoTConsensus implements IConsensus {
               new IoTConsensusServerImpl(
                   path.toString(),
                   new Peer(consensusGroupId, thisNodeId, thisNode),
-                  new ArrayList<>(),
+                  new TreeSet<>(),
                   registry.apply(consensusGroupId),
                   backgroundTaskService,
                   clientManager,
@@ -188,7 +189,7 @@ public class IoTConsensus implements IConsensus {
       BiConsumer<ConsensusGroupId, List<Peer>> resetPeerListWithoutThrow =
           (consensusGroupId, peers) -> {
             try {
-              resetPeerList(consensusGroupId, peers);
+              resetPeerListImpl(consensusGroupId, peers, false);
             } catch (ConsensusGroupNotExistException ignore) {
 
             } catch (Exception e) {
@@ -281,7 +282,7 @@ public class IoTConsensus implements IConsensus {
                       new IoTConsensusServerImpl(
                           path,
                           new Peer(groupId, thisNodeId, thisNode),
-                          peers,
+                          new TreeSet<>(peers),
                           registry.apply(groupId),
                           backgroundTaskService,
                           clientManager,
@@ -490,6 +491,12 @@ public class IoTConsensus implements IConsensus {
   @Override
   public void resetPeerList(ConsensusGroupId groupId, List<Peer> correctPeers)
       throws ConsensusException {
+    resetPeerListImpl(groupId, correctPeers, true);
+  }
+
+  private void resetPeerListImpl(
+      ConsensusGroupId groupId, List<Peer> correctPeers, boolean startNow)
+      throws ConsensusException {
     IoTConsensusServerImpl impl =
         Optional.ofNullable(stateMachineMap.get(groupId))
             .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
@@ -497,7 +504,7 @@ public class IoTConsensus implements IConsensus {
     Peer localPeer = new Peer(groupId, thisNodeId, thisNode);
     if (!correctPeers.contains(localPeer)) {
       logger.info(
-          "[RESET PEER LIST] Local peer is not in the correct configuration, 
delete local peer {}",
+          "[RESET PEER LIST] {} Local peer is not in the correct 
configuration, delete it.",
           groupId);
       deleteLocalPeer(groupId);
       return;
@@ -510,29 +517,32 @@ public class IoTConsensus implements IConsensus {
       for (Peer peer : currentMembers) {
         if (!correctPeers.contains(peer)) {
           if (!impl.removeSyncLogChannel(peer)) {
-            logger.error("[RESET PEER LIST] Failed to remove sync channel 
with: {}", peer);
+            logger.error(
+                "[RESET PEER LIST] {} Failed to remove sync channel with: {}", 
groupId, peer);
           } else {
-            logger.info("[RESET PEER LIST] Remove sync channel with: {}", 
peer);
+            logger.info("[RESET PEER LIST] {} Remove sync channel with: {}", 
groupId, peer);
           }
         }
       }
       // add correct peer
       for (Peer peer : correctPeers) {
         if (!impl.getConfiguration().contains(peer)) {
-          impl.buildSyncLogChannel(peer);
-          logger.info("[RESET PEER LIST] Build sync channel with: {}", peer);
+          impl.buildSyncLogChannel(peer, startNow);
+          logger.info("[RESET PEER LIST] {} Build sync channel with: {}", 
groupId, peer);
         }
       }
       // show result
       String newPeerListStr = impl.getConfiguration().toString();
       if (!previousPeerListStr.equals(newPeerListStr)) {
         logger.info(
-            "[RESET PEER LIST] Local peer list has been reset: {} -> {}",
+            "[RESET PEER LIST] {} Local peer list has been reset: {} -> {}",
+            groupId,
             previousPeerListStr,
             newPeerListStr);
       } else {
         logger.info(
-            "[RESET PEER LIST] The current peer list is correct, nothing need 
to be reset: {}",
+            "[RESET PEER LIST] {} The current peer list is correct, nothing 
need to be reset: {}",
+            groupId,
             previousPeerListStr);
       }
     }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index ee049ee4d9c..a13a19dde8d 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -73,11 +73,9 @@ import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.UncheckedIOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.file.Files;
@@ -85,12 +83,11 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Objects;
 import java.util.PriorityQueue;
-import java.util.Set;
+import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
@@ -100,15 +97,11 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 import static 
org.apache.iotdb.commons.utils.FileUtils.humanReadableByteCountSI;
 
 public class IoTConsensusServerImpl {
 
-  private static final String CONFIGURATION_FILE_NAME = "configuration.dat";
-  private static final String CONFIGURATION_TMP_FILE_NAME = 
"configuration.dat.tmp";
   public static final String SNAPSHOT_DIR_NAME = "snapshot";
   private static final Pattern SNAPSHOT_INDEX_PATTEN = 
Pattern.compile(".*[^\\d](?=(\\d+))");
   private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS 
=
@@ -120,7 +113,7 @@ public class IoTConsensusServerImpl {
   private final Lock stateMachineLock = new ReentrantLock();
   private final Condition stateMachineCondition = 
stateMachineLock.newCondition();
   private final String storageDir;
-  private final List<Peer> configuration;
+  private final TreeSet<Peer> configuration;
   private final AtomicLong searchIndex;
   private final LogDispatcher logDispatcher;
   private IoTConsensusConfig config;
@@ -137,7 +130,7 @@ public class IoTConsensusServerImpl {
   public IoTConsensusServerImpl(
       String storageDir,
       Peer thisNode,
-      List<Peer> configuration,
+      TreeSet<Peer> configuration,
       IStateMachine stateMachine,
       ScheduledExecutorService backgroundTaskService,
       IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager,
@@ -150,25 +143,14 @@ public class IoTConsensusServerImpl {
     this.cacheQueueMap = new ConcurrentHashMap<>();
     this.syncClientManager = syncClientManager;
     this.configuration = configuration;
-    if (configuration.isEmpty()) {
-      recoverConfiguration();
-    } else {
-      persistConfiguration();
-    }
     this.backgroundTaskService = backgroundTaskService;
     this.config = config;
     this.consensusGroupId = thisNode.getGroupId().toString();
-    consensusReqReader = (ConsensusReqReader) stateMachine.read(new 
GetConsensusReqReaderPlan());
+    this.consensusReqReader =
+        (ConsensusReqReader) stateMachine.read(new 
GetConsensusReqReaderPlan());
     this.searchIndex = new 
AtomicLong(consensusReqReader.getCurrentSearchIndex());
     this.ioTConsensusServerMetrics = new IoTConsensusServerMetrics(this);
     this.logDispatcher = new LogDispatcher(this, clientManager);
-    // Since the underlying wal does not persist safelyDeletedSearchIndex, 
IoTConsensus needs to
-    // update wal with its syncIndex recovered from the consensus layer when 
initializing.
-    // This prevents wal from being piled up if the safelyDeletedSearchIndex 
is not updated after
-    // the restart and Leader migration occurs
-    checkAndUpdateSafeDeletedSearchIndex();
-    // see message in logs for details
-    checkAndUpdateSearchIndex();
   }
 
   public IStateMachine getStateMachine() {
@@ -176,6 +158,7 @@ public class IoTConsensusServerImpl {
   }
 
   public void start() {
+    checkAndUpdateIndex();
     MetricService.getInstance().addMetricSet(this.ioTConsensusServerMetrics);
     stateMachine.start();
     logDispatcher.start();
@@ -498,7 +481,7 @@ public class IoTConsensusServerImpl {
       if (peer.equals(thisNode)) {
         // use searchIndex for thisNode as the initialSyncIndex because 
targetPeer will load the
         // snapshot produced by thisNode
-        buildSyncLogChannel(targetPeer);
+        buildSyncLogChannel(targetPeer, true);
       } else {
         // use RPC to tell other peers to build sync log channel to target peer
         try (SyncIoTConsensusServiceClient client =
@@ -646,23 +629,22 @@ public class IoTConsensusServerImpl {
   }
 
   /** build SyncLog channel with safeIndex as the default initial sync index. 
*/
-  public void buildSyncLogChannel(Peer targetPeer) {
-    buildSyncLogChannel(targetPeer, getMinSyncIndex());
+  public void buildSyncLogChannel(Peer targetPeer, boolean startNow) {
+    buildSyncLogChannel(targetPeer, getMinSyncIndex(), startNow);
   }
 
-  public void buildSyncLogChannel(Peer targetPeer, long initialSyncIndex) {
+  public void buildSyncLogChannel(Peer targetPeer, long initialSyncIndex, 
boolean startNow) {
     KillPoint.setKillPoint(DataNodeKillPoints.ORIGINAL_ADD_PEER_DONE);
-    // step 1, build sync channel in LogDispatcher
+    configuration.add(targetPeer);
+    if (Objects.equals(targetPeer, thisNode)) {
+      return;
+    }
+    logDispatcher.addLogDispatcherThread(targetPeer, initialSyncIndex, 
startNow);
     logger.info(
-        "[IoTConsensus] build sync log channel to {} with initialSyncIndex {}",
+        "[IoTConsensus] Successfully build sync log channel to {} with 
initialSyncIndex {}. {}",
         targetPeer,
-        initialSyncIndex);
-    logDispatcher.addLogDispatcherThread(targetPeer, initialSyncIndex);
-    // step 2, update configuration
-    configuration.add(targetPeer);
-    // step 3, persist configuration
-    persistConfiguration();
-    logger.info("[IoTConsensus Configuration] persist new configuration: {}", 
configuration);
+        initialSyncIndex,
+        startNow ? "Sync log channel has started." : "Sync log channel maybe 
start later.");
   }
 
   /**
@@ -689,8 +671,6 @@ public class IoTConsensusServerImpl {
     // step 2, update configuration
     configuration.remove(targetPeer);
     checkAndUpdateSafeDeletedSearchIndex();
-    // step 3, persist configuration
-    persistConfiguration();
     logger.info(
         "[IoTConsensus Configuration] Configuration updated to {}. {}",
         this.configuration,
@@ -698,103 +678,6 @@ public class IoTConsensusServerImpl {
     return !exceptionHappened;
   }
 
-  public void persistConfiguration() {
-    try {
-      removeDuplicateConfiguration();
-      renameTmpConfigurationFileToRemoveSuffix();
-      serializeConfigurationAndFsyncToDisk();
-      deleteConfiguration();
-      renameTmpConfigurationFileToRemoveSuffix();
-    } catch (IOException e) {
-      // TODO: (xingtanzjr) need to handle the IOException because the 
IoTConsensus won't
-      // work expectedly
-      //  if the exception occurs
-      logger.error("Unexpected error occurs when persisting configuration", e);
-    }
-  }
-
-  public void recoverConfiguration() {
-    try {
-      Path tmpConfigurationPath =
-          Paths.get(new File(storageDir, 
CONFIGURATION_TMP_FILE_NAME).getAbsolutePath());
-      Path configurationPath =
-          Paths.get(new File(storageDir, 
CONFIGURATION_FILE_NAME).getAbsolutePath());
-      // If the tmpConfigurationPath exists, it means the 
`persistConfigurationUpdate` is
-      // interrupted
-      // unexpectedly, we need substitute configuration with tmpConfiguration 
file
-      if (Files.exists(tmpConfigurationPath)) {
-        Files.deleteIfExists(configurationPath);
-        Files.move(tmpConfigurationPath, configurationPath);
-        logger.info(
-            "[IoTConsensus Configuration] recover configuration from 
tmpConfigurationFile, {}",
-            tmpConfigurationPath);
-      }
-      if (Files.exists(configurationPath)) {
-        recoverFromOldConfigurationFile(configurationPath);
-        logger.info(
-            "[IoTConsensus Configuration] recover configuration from 
oldConfigurationFile, {}",
-            configurationPath);
-      } else {
-        // recover from split configuration file
-        logger.info(
-            "[IoTConsensus Configuration] recover configuration from old split 
configuration file");
-        Path dirPath = Paths.get(storageDir);
-        List<Peer> tmpPeerList = getConfiguration(dirPath, 
CONFIGURATION_TMP_FILE_NAME);
-        configuration.addAll(tmpPeerList);
-        logger.info(
-            "[IoTConsensus Configuration] recover configuration from 
tmpPeerList, {}",
-            configuration);
-        List<Peer> peerList = getConfiguration(dirPath, 
CONFIGURATION_FILE_NAME);
-        for (Peer peer : peerList) {
-          if (!configuration.contains(peer)) {
-            configuration.add(peer);
-          }
-        }
-        logger.info(
-            "[IoTConsensus Configuration] recover configuration from peerList, 
{}", configuration);
-        persistConfiguration();
-      }
-      logger.info("Recover IoTConsensus server Impl, configuration: {}", 
configuration);
-    } catch (IOException e) {
-      logger.error("Unexpected error occurs when recovering configuration", e);
-    }
-  }
-
-  // @Compatibility
-  private void recoverFromOldConfigurationFile(Path oldConfigurationPath) 
throws IOException {
-    // recover from old configuration file
-    ByteBuffer buffer = 
ByteBuffer.wrap(Files.readAllBytes(oldConfigurationPath));
-    int size = buffer.getInt();
-    for (int i = 0; i < size; i++) {
-      configuration.add(Peer.deserialize(buffer));
-    }
-    persistConfiguration();
-  }
-
-  public static String generateConfigurationDatFileName(int nodeId, String 
suffix) {
-    return nodeId + "_" + suffix;
-  }
-
-  private List<Peer> getConfiguration(Path dirPath, String 
configurationFileName)
-      throws IOException {
-    ByteBuffer buffer;
-    List<Peer> tmpConfiguration = new ArrayList<>();
-    Path[] files =
-        Files.walk(dirPath)
-            .filter(Files::isRegularFile)
-            .filter(filePath -> 
filePath.getFileName().toString().contains(configurationFileName))
-            .toArray(Path[]::new);
-    logger.info(
-        "[IoTConsensus Configuration] getConfiguration: fileName, {}, 
fileList: {}",
-        configurationFileName,
-        files);
-    for (Path file : files) {
-      buffer = ByteBuffer.wrap(Files.readAllBytes(file));
-      tmpConfiguration.add(Peer.deserialize(buffer));
-    }
-    return tmpConfiguration;
-  }
-
   public IndexedConsensusRequest buildIndexedConsensusRequestForLocalRequest(
       IConsensusRequest request) {
     if (request instanceof ComparableConsensusRequest) {
@@ -832,7 +715,7 @@ public class IoTConsensusServerImpl {
   }
 
   public List<Peer> getConfiguration() {
-    return configuration;
+    return new ArrayList<>(configuration);
   }
 
   public long getSearchIndex() {
@@ -943,12 +826,25 @@ public class IoTConsensusServerImpl {
     }
   }
 
+  void checkAndUpdateIndex() {
+    // Since the underlying wal does not persist safelyDeletedSearchIndex, 
IoTConsensus needs to
+    // update wal with its syncIndex recovered from the consensus layer when 
initializing.
+    // This prevents wal from being piled up if the safelyDeletedSearchIndex 
is not updated after
+    // the restart and Leader migration occurs
+    checkAndUpdateSafeDeletedSearchIndex();
+    // see message in logs for details
+    checkAndUpdateSearchIndex();
+  }
+
   /**
-   * If there is only one replica, set it to Long.MAX_VALUE.、 If there are 
multiple replicas, get
-   * the latest SafelyDeletedSearchIndex again. This enables wal to be deleted 
in a timely manner.
+   * If there is only one replica, set it to Long.MAX_VALUE. If there are 
multiple replicas, get the
+   * latest SafelyDeletedSearchIndex again. This enables wal to be deleted in 
a timely manner.
    */
-  public void checkAndUpdateSafeDeletedSearchIndex() {
-    if (configuration.size() == 1) {
+  void checkAndUpdateSafeDeletedSearchIndex() {
+    if (configuration.isEmpty()) {
+      logger.error(
+          "Configuration is empty, which is unexpected. Safe deleted search 
index won't be updated this time.");
+    } else if (configuration.size() == 1) {
       consensusReqReader.setSafelyDeletedSearchIndex(Long.MAX_VALUE);
     } else {
       consensusReqReader.setSafelyDeletedSearchIndex(getMinFlushedSyncIndex());
@@ -984,90 +880,6 @@ public class IoTConsensusServerImpl {
     return consensusGroupId;
   }
 
-  private void serializeConfigurationAndFsyncToDisk() throws IOException {
-    for (Peer peer : configuration) {
-      String peerConfigurationFileName =
-          generateConfigurationDatFileName(peer.getNodeId(), 
CONFIGURATION_TMP_FILE_NAME);
-      FileOutputStream fileOutputStream =
-          new FileOutputStream(new File(storageDir, 
peerConfigurationFileName));
-      try (DataOutputStream outputStream = new 
DataOutputStream(fileOutputStream)) {
-        peer.serialize(outputStream);
-      } finally {
-        try {
-          fileOutputStream.flush();
-          fileOutputStream.getFD().sync();
-        } catch (IOException ignore) {
-          // ignore sync exception
-        }
-      }
-      logger.info("[IoTConsensus Configuration] serializeConfiguration: {}", 
peer);
-    }
-  }
-
-  private void renameTmpConfigurationFileToRemoveSuffix() throws IOException {
-    try (Stream<Path> stream = Files.list(Paths.get(storageDir))) {
-      List<Path> paths =
-          stream
-              .filter(Files::isRegularFile)
-              .filter(
-                  filePath ->
-                      
filePath.getFileName().toString().endsWith(CONFIGURATION_TMP_FILE_NAME))
-              .collect(Collectors.toList());
-      for (Path filePath : paths) {
-        String targetPath =
-            filePath.toString().replace(CONFIGURATION_TMP_FILE_NAME, 
CONFIGURATION_FILE_NAME);
-        File targetFile = new File(targetPath);
-        if (targetFile.exists()) {
-          try {
-            Files.delete(targetFile.toPath());
-          } catch (IOException e) {
-            logger.error("Unexpected error occurs when delete file: {}", 
targetPath, e);
-          }
-        }
-        if (!filePath.toFile().renameTo(targetFile)) {
-          logger.error("Unexpected error occurs when rename file: {} -> {}", 
filePath, targetPath);
-        }
-        logger.info("[IoTConsensus Configuration] renameTmpConfigurationFile: 
{}", targetPath);
-      }
-    } catch (UncheckedIOException e) {
-      throw e.getCause();
-    }
-  }
-
-  private void deleteConfiguration() throws IOException {
-    try (Stream<Path> stream = Files.list(Paths.get(storageDir))) {
-      stream
-          .filter(Files::isRegularFile)
-          .filter(filePath -> 
filePath.getFileName().toString().endsWith(CONFIGURATION_FILE_NAME))
-          .forEach(
-              filePath -> {
-                try {
-                  Files.delete(filePath);
-                } catch (IOException e) {
-                  logger.error(
-                      "Unexpected error occurs when deleting old configuration 
file {}",
-                      filePath,
-                      e);
-                }
-                logger.info("[IoTConsensus Configuration] deleteConfiguration: 
{}", filePath);
-              });
-    } catch (UncheckedIOException e) {
-      throw e.getCause();
-    }
-  }
-
-  public void removeDuplicateConfiguration() {
-    Set<Peer> seen = new HashSet<>();
-    Iterator<Peer> it = configuration.iterator();
-
-    while (it.hasNext()) {
-      Peer peer = it.next();
-      if (!seen.add(peer)) {
-        it.remove();
-      }
-    }
-  }
-
   /** This method is used for hot reload of IoTConsensusConfig. */
   public void reloadConsensusConfig(IoTConsensusConfig config) {
     this.config = config;
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index 6b33fcc5dfc..6f67bc70c86 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -98,7 +98,7 @@ public class LogDispatcher {
 
   public synchronized void start() {
     if (!threads.isEmpty()) {
-      threads.forEach(executorService::submit);
+      threads.forEach(logDispatcherThread -> 
executorService.submit(logDispatcherThread));
     }
   }
 
@@ -120,7 +120,8 @@ public class LogDispatcher {
     stopped = true;
   }
 
-  public synchronized void addLogDispatcherThread(Peer peer, long 
initialSyncIndex) {
+  public synchronized void addLogDispatcherThread(
+      Peer peer, long initialSyncIndex, boolean startNow) {
     if (stopped) {
       return;
     }
@@ -131,7 +132,9 @@ public class LogDispatcher {
     if (this.executorService == null) {
       initLogSyncThreadPool();
     }
-    executorService.submit(thread);
+    if (startNow) {
+      executorService.submit(thread);
+    }
   }
 
   public synchronized void removeLogDispatcherThread(Peer peer) throws 
IOException {
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
index f6a67c3e8c6..2bac66738fd 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
@@ -189,7 +189,7 @@ public class IoTConsensusRPCServiceProcessor implements 
IoTConsensusIService.Ifa
       return new TBuildSyncLogChannelRes(status);
     }
     TSStatus responseStatus;
-    impl.buildSyncLogChannel(new Peer(groupId, req.nodeId, req.endPoint));
+    impl.buildSyncLogChannel(new Peer(groupId, req.nodeId, req.endPoint), 
true);
     responseStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     return new TBuildSyncLogChannelRes(responseStatus);
   }
diff --git 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
index 3ec7769f2ac..f22d3fdadcd 100644
--- 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
+++ 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
@@ -46,8 +46,8 @@ import java.io.IOException;
 import java.net.ServerSocket;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 public class ReplicateTest {
@@ -57,12 +57,6 @@ public class ReplicateTest {
 
   private final ConsensusGroupId gid = new DataRegionId(1);
 
-  private static final long timeout = TimeUnit.SECONDS.toMillis(300);
-
-  private static final String CONFIGURATION_FILE_NAME = "configuration.dat";
-
-  private static final String CONFIGURATION_TMP_FILE_NAME = 
"configuration.dat.tmp";
-
   private int basePort = 9000;
 
   private final List<Peer> peers =
@@ -73,9 +67,9 @@ public class ReplicateTest {
 
   private final List<File> peersStorage =
       Arrays.asList(
-          new File("target" + java.io.File.separator + "1"),
-          new File("target" + java.io.File.separator + "2"),
-          new File("target" + java.io.File.separator + "3"));
+          new File("target" + File.separator + "1"),
+          new File("target" + File.separator + "2"),
+          new File("target" + File.separator + "3"));
 
   private final ConsensusGroup group = new ConsensusGroup(gid, peers);
   private final List<IoTConsensus> servers = new ArrayList<>();
@@ -120,6 +114,9 @@ public class ReplicateTest {
                                 String.format(
                                     ConsensusFactory.CONSTRUCT_FAILED_MSG,
                                     ConsensusFactory.IOT_CONSENSUS))));
+        
servers.get(i).recordCorrectPeerListBeforeStarting(Collections.singletonMap(gid,
 peers));
+      }
+      for (int i = 0; i < peers.size(); i++) {
         servers.get(i).start();
       }
     } catch (IOException e) {
@@ -187,23 +184,9 @@ public class ReplicateTest {
       stopServer();
       initServer();
 
-      Assert.assertEquals(
-          peers.stream().map(Peer::getNodeId).collect(Collectors.toSet()),
-          servers.get(0).getImpl(gid).getConfiguration().stream()
-              .map(Peer::getNodeId)
-              .collect(Collectors.toSet()));
-
-      Assert.assertEquals(
-          peers.stream().map(Peer::getNodeId).collect(Collectors.toSet()),
-          servers.get(1).getImpl(gid).getConfiguration().stream()
-              .map(Peer::getNodeId)
-              .collect(Collectors.toSet()));
-
-      Assert.assertEquals(
-          peers.stream().map(Peer::getNodeId).collect(Collectors.toSet()),
-          servers.get(2).getImpl(gid).getConfiguration().stream()
-              .map(Peer::getNodeId)
-              .collect(Collectors.toSet()));
+      checkPeerList(servers.get(0).getImpl(gid));
+      checkPeerList(servers.get(1).getImpl(gid));
+      checkPeerList(servers.get(2).getImpl(gid));
 
       Assert.assertEquals(CHECK_POINT_GAP, 
servers.get(0).getImpl(gid).getSearchIndex());
       Assert.assertEquals(CHECK_POINT_GAP, 
servers.get(1).getImpl(gid).getSearchIndex());
@@ -264,23 +247,9 @@ public class ReplicateTest {
       initServer();
 
       servers.get(2).createLocalPeer(group.getGroupId(), group.getPeers());
-      Assert.assertEquals(
-          peers.stream().map(Peer::getNodeId).collect(Collectors.toSet()),
-          servers.get(0).getImpl(gid).getConfiguration().stream()
-              .map(Peer::getNodeId)
-              .collect(Collectors.toSet()));
-
-      Assert.assertEquals(
-          peers.stream().map(Peer::getNodeId).collect(Collectors.toSet()),
-          servers.get(1).getImpl(gid).getConfiguration().stream()
-              .map(Peer::getNodeId)
-              .collect(Collectors.toSet()));
-
-      Assert.assertEquals(
-          peers.stream().map(Peer::getNodeId).collect(Collectors.toSet()),
-          servers.get(2).getImpl(gid).getConfiguration().stream()
-              .map(Peer::getNodeId)
-              .collect(Collectors.toSet()));
+      checkPeerList(servers.get(0).getImpl(gid));
+      checkPeerList(servers.get(1).getImpl(gid));
+      checkPeerList(servers.get(2).getImpl(gid));
 
       Assert.assertEquals(CHECK_POINT_GAP, 
servers.get(0).getImpl(gid).getSearchIndex());
       Assert.assertEquals(CHECK_POINT_GAP, 
servers.get(1).getImpl(gid).getSearchIndex());
@@ -345,4 +314,10 @@ public class ReplicateTest {
     }
     return true;
   }
+
+  private void checkPeerList(IoTConsensusServerImpl iotServerImpl) {
+    Assert.assertEquals(
+        peers.stream().map(Peer::getNodeId).collect(Collectors.toSet()),
+        
iotServerImpl.getConfiguration().stream().map(Peer::getNodeId).collect(Collectors.toSet()));
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
index 6a771007b3a..f3fe432ae7e 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import java.util.Objects;
 
 /** We abstract this class to hide word `ConsensusGroup` for IoTDB 
StorageEngine/SchemaEngine. */
-public abstract class ConsensusGroupId {
+public abstract class ConsensusGroupId implements Comparable<ConsensusGroupId> 
{
 
   protected int id;
 
@@ -126,4 +126,9 @@ public abstract class ConsensusGroupId {
       return create(tConsensusGroupId.getType().getValue(), 
tConsensusGroupId.getId());
     }
   }
+
+  @Override
+  public int compareTo(ConsensusGroupId o) {
+    return Integer.compare(id, o.id);
+  }
 }


Reply via email to