This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch region_migration
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/region_migration by this push:
     new e6d0e4d8ed7 Split configuration and add resetPeerList for region 
migration (#12101)
e6d0e4d8ed7 is described below

commit e6d0e4d8ed7378d7be58dd8e90b02c400161a1ce
Author: Xiangpeng Hu <[email protected]>
AuthorDate: Tue Mar 5 14:40:51 2024 +0800

    Split configuration and add resetPeerList for region migration (#12101)
---
 .../org/apache/iotdb/consensus/IConsensus.java     | 13 +++
 .../apache/iotdb/consensus/iot/IoTConsensus.java   | 27 ++++++
 .../consensus/iot/IoTConsensusServerImpl.java      | 97 +++++++++++++++-------
 .../iotdb/consensus/ratis/RatisConsensus.java      | 37 +++++++++
 .../iotdb/consensus/simple/SimpleConsensus.java    |  6 ++
 5 files changed, 151 insertions(+), 29 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
index 4ee908a7af8..dc3942ca071 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
@@ -143,6 +143,19 @@ public interface IConsensus {
    */
   void removeRemotePeer(ConsensusGroupId groupId, Peer peer) throws 
ConsensusException;
 
+  /**
+   * Reset the peer list of the corresponding consensus group. Currently only 
used in the automatic
+   * cleanup of region migration as a rollback for {@link 
#addRemotePeer(ConsensusGroupId, Peer)},
+   * so it will only be less but not more.
+   *
+   * @param groupId the consensus group
+   * @param peers the new peer list
+   * @return reset result
+   * @throws ConsensusException when resetPeerList doesn't success with other 
reasons
+   * @throws ConsensusGroupNotExistException when the specified consensus 
group doesn't exist
+   */
+  TSStatus resetPeerList(ConsensusGroupId groupId, List<Peer> peers) throws 
ConsensusException;
+
   // management API
 
   /**
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index 94e116882e0..fd42f127244 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -393,6 +393,33 @@ public class IoTConsensus implements IConsensus {
     return new ArrayList<>(stateMachineMap.keySet());
   }
 
+  public TSStatus resetPeerList(ConsensusGroupId groupId, List<Peer> peers)
+      throws ConsensusException {
+    IoTConsensusServerImpl impl =
+        Optional.ofNullable(stateMachineMap.get(groupId))
+            .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
+    if (impl.isReadOnly()) {
+      return StatusUtils.getStatus(TSStatusCode.SYSTEM_READ_ONLY);
+    } else if (!impl.isActive()) {
+      return RpcUtils.getStatus(
+          TSStatusCode.WRITE_PROCESS_REJECT,
+          "peer is inactive and not ready to receive reset configuration 
request.");
+    } else {
+      for (Peer peer : impl.getConfiguration()) {
+        if (!peers.contains(peer)) {
+          try {
+            removeRemotePeer(groupId, peer);
+          } catch (ConsensusException e) {
+            logger.error("Failed to remove peer {} from group {}", peer, 
groupId, e);
+            return RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, 
e.getMessage());
+          }
+        }
+      }
+      impl.resetConfiguration(peers);
+      return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+    }
+  }
+
   public IoTConsensusServerImpl getImpl(ConsensusGroupId groupId) {
     return stateMachineMap.get(groupId);
   }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 800754ddba6..6fd4bebae16 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -605,25 +605,13 @@ public class IoTConsensusServerImpl {
   public void persistConfigurationUpdate() throws 
ConsensusGroupModifyPeerException {
     try {
       serializeConfigurationAndFsyncToDisk(CONFIGURATION_TMP_FILE_NAME);
-      Path tmpConfigurationPath =
-          Paths.get(new File(storageDir, 
CONFIGURATION_TMP_FILE_NAME).getAbsolutePath());
-      Path configurationPath =
-          Paths.get(new File(storageDir, 
CONFIGURATION_FILE_NAME).getAbsolutePath());
-      Files.deleteIfExists(configurationPath);
-      Files.move(tmpConfigurationPath, configurationPath);
+      tmpConfigurationUpdate(configuration);
     } catch (IOException e) {
       throw new ConsensusGroupModifyPeerException(
           "Unexpected error occurs when update configuration", e);
     }
   }
 
-  private void serializeConfigurationTo(DataOutputStream outputStream) throws 
IOException {
-    outputStream.writeInt(configuration.size());
-    for (Peer peer : configuration) {
-      peer.serialize(outputStream);
-    }
-  }
-
   public void recoverConfiguration() {
     ByteBuffer buffer;
     try {
@@ -635,15 +623,25 @@ public class IoTConsensusServerImpl {
       // interrupted
       // unexpectedly, we need substitute configuration with tmpConfiguration 
file
       if (Files.exists(tmpConfigurationPath)) {
-        if (Files.exists(configurationPath)) {
-          Files.delete(configurationPath);
-        }
+        Files.deleteIfExists(configurationPath);
         Files.move(tmpConfigurationPath, configurationPath);
       }
-      buffer = ByteBuffer.wrap(Files.readAllBytes(configurationPath));
-      int size = buffer.getInt();
-      for (int i = 0; i < size; i++) {
-        configuration.add(Peer.deserialize(buffer));
+      if (Files.exists(configurationPath)) {
+        // recover from old configuration file
+        buffer = ByteBuffer.wrap(Files.readAllBytes(configurationPath));
+        int size = buffer.getInt();
+        for (int i = 0; i < size; i++) {
+          configuration.add(Peer.deserialize(buffer));
+        }
+        Files.delete(configurationPath);
+        persistConfiguration();
+      } else {
+        // recover from split configuration file
+        Path dirPath = Paths.get(storageDir);
+        List<Peer> tmpPeerList = getConfiguration(dirPath, 
CONFIGURATION_TMP_FILE_NAME);
+        tmpConfigurationUpdate(tmpPeerList);
+        List<Peer> peerList = getConfiguration(dirPath, 
CONFIGURATION_FILE_NAME);
+        configuration.addAll(peerList);
       }
       logger.info("Recover IoTConsensus server Impl, configuration: {}", 
configuration);
     } catch (IOException e) {
@@ -651,6 +649,42 @@ public class IoTConsensusServerImpl {
     }
   }
 
+  private void tmpConfigurationUpdate(List<Peer> tmpPeerList) throws 
IOException {
+    for (Peer peer : tmpPeerList) {
+      Path tmpConfigurationPath =
+          Paths.get(
+              new File(storageDir, peer.getNodeId() + "_" + 
CONFIGURATION_TMP_FILE_NAME)
+                  .getAbsolutePath());
+      Path configurationPath =
+          Paths.get(
+              new File(storageDir, peer.getNodeId() + "_" + 
CONFIGURATION_FILE_NAME)
+                  .getAbsolutePath());
+      Files.deleteIfExists(configurationPath);
+      Files.move(tmpConfigurationPath, configurationPath);
+    }
+  }
+
+  private List<Peer> getConfiguration(Path dirPath, String 
configurationFileName)
+      throws IOException {
+    ByteBuffer buffer;
+    List<Peer> tmpConfiguration = new ArrayList<>();
+    Path[] files =
+        Files.walk(dirPath)
+            .filter(Files::isRegularFile)
+            .filter(filePath -> 
filePath.getFileName().toString().contains(configurationFileName))
+            .toArray(Path[]::new);
+    for (Path file : files) {
+      buffer = ByteBuffer.wrap(Files.readAllBytes(file));
+      tmpConfiguration.add(Peer.deserialize(buffer));
+    }
+    return tmpConfiguration;
+  }
+
+  public void resetConfiguration(List<Peer> newConfiguration) {
+    configuration.clear();
+    configuration.addAll(newConfiguration);
+  }
+
   public IndexedConsensusRequest buildIndexedConsensusRequestForLocalRequest(
       IConsensusRequest request) {
     if (request instanceof ComparableConsensusRequest) {
@@ -841,15 +875,20 @@ public class IoTConsensusServerImpl {
 
   private void serializeConfigurationAndFsyncToDisk(String 
configurationFileName)
       throws IOException {
-    FileOutputStream fileOutputStream =
-        new FileOutputStream(new File(storageDir, configurationFileName));
-    DataOutputStream outputStream = new DataOutputStream(fileOutputStream);
-    try {
-      serializeConfigurationTo(outputStream);
-    } finally {
-      fileOutputStream.flush();
-      fileOutputStream.getFD().sync();
-      outputStream.close();
+    for (Peer peer : configuration) {
+      String peerConfigurationFileName = peer.getNodeId() + "_" + 
configurationFileName;
+      FileOutputStream fileOutputStream =
+          new FileOutputStream(new File(storageDir, 
peerConfigurationFileName));
+      try (DataOutputStream outputStream = new 
DataOutputStream(fileOutputStream)) {
+        peer.serialize(outputStream);
+      } finally {
+        try {
+          fileOutputStream.flush();
+          fileOutputStream.getFD().sync();
+        } catch (IOException e) {
+          logger.error("Failed to fsync the configuration file {}", 
peerConfigurationFileName, e);
+        }
+      }
     }
   }
 
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index c627692b72d..86b77e16c8e 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -51,6 +51,7 @@ import 
org.apache.iotdb.consensus.ratis.metrics.RatisMetricsManager;
 import org.apache.iotdb.consensus.ratis.utils.Retriable;
 import org.apache.iotdb.consensus.ratis.utils.RetryPolicy;
 import org.apache.iotdb.consensus.ratis.utils.Utils;
+import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.commons.pool2.KeyedObjectPool;
@@ -537,6 +538,42 @@ class RatisConsensus implements IConsensus {
     sendReconfiguration(RaftGroup.valueOf(raftGroupId, newConfig));
   }
 
+  @Override
+  public TSStatus resetPeerList(ConsensusGroupId groupId, List<Peer> peers)
+      throws ConsensusException {
+    RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
+    RaftGroup group = getGroupInfo(raftGroupId);
+
+    // pre-conditions: group exists and myself in this group
+    if (group == null || !group.getPeers().contains(myself)) {
+      throw new ConsensusGroupNotExistException(groupId);
+    }
+
+    TSStatus writeResult = RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+    for (Peer peer : peers) {
+      RaftPeer peerToRemove = Utils.fromPeerAndPriorityToRaftPeer(peer, 
DEFAULT_PRIORITY);
+      // pre-condition: peer is a member of groupId
+      if (!group.getPeers().contains(peerToRemove)) {
+        throw new PeerAlreadyInConsensusGroupException(groupId, peer);
+      }
+      // update group peer information
+      List<RaftPeer> newConfig =
+          group.getPeers().stream()
+              .filter(raftPeer -> !raftPeer.equals(peerToRemove))
+              .collect(Collectors.toList());
+      RaftClientReply reply = 
sendReconfiguration(RaftGroup.valueOf(raftGroupId, newConfig));
+      if (!reply.isSuccess()) {
+        throw new RatisRequestFailedException(reply.getException());
+      }
+      try {
+        writeResult = 
Utils.deserializeFrom(reply.getMessage().getContent().asReadOnlyByteBuffer());
+      } catch (Exception e) {
+        throw new RatisRequestFailedException(e);
+      }
+    }
+    return writeResult;
+  }
+
   /** NOTICE: transferLeader *does not guarantee* the leader be transferred to 
newLeader. */
   @Override
   public void transferLeader(ConsensusGroupId groupId, Peer newLeader) throws 
ConsensusException {
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java
index 59f6cccd6f8..d9c0aca85de 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java
@@ -241,6 +241,12 @@ class SimpleConsensus implements IConsensus {
     return new ArrayList<>(stateMachineMap.keySet());
   }
 
+  @Override
+  public TSStatus resetPeerList(ConsensusGroupId groupId, List<Peer> peers)
+      throws ConsensusException {
+    throw new ConsensusException("SimpleConsensus does not support reset peer 
list");
+  }
+
   private String buildPeerDir(ConsensusGroupId groupId) {
     return storageDir + File.separator + groupId.getType().getValue() + "_" + 
groupId.getId();
   }

Reply via email to