This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new cabdec75b63 Fixed restart error when DataRegion is RatisConsensus 
(#12554)
cabdec75b63 is described below

commit cabdec75b637a9f235a55ec634f4b71b66765f65
Author: Potato <[email protected]>
AuthorDate: Tue May 21 21:19:14 2024 +0800

    Fixed restart error when DataRegion is RatisConsensus (#12554)
---
 .../iotdb/confignode/manager/ConfigManager.java    |   2 +-
 .../manager/consensus/ConsensusManager.java        |   7 ++
 .../org/apache/iotdb/consensus/IConsensus.java     |   3 +-
 .../apache/iotdb/consensus/ratis/DiskGuardian.java |  65 ++++++------
 .../iotdb/consensus/ratis/RatisConsensus.java      | 113 ++++++++++++---------
 5 files changed, 110 insertions(+), 80 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 0c407afec2f..2666008cd80 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -1280,7 +1280,7 @@ public class ConfigManager implements IManager {
 
     for (int i = 0; i < rpcTimeoutInMS / retryIntervalInMS; i++) {
       try {
-        if (consensusManager.get() == null) {
+        if (consensusManager.get() == null || 
!consensusManager.get().isInitialized()) {
           TimeUnit.MILLISECONDS.sleep(retryIntervalInMS);
         } else {
           // When add non Seed-ConfigNode to the ConfigNodeGroup, the 
parameter should be emptyList
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
index e28aee7fdab..361229f4435 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
@@ -77,6 +77,8 @@ public class ConsensusManager {
   private final IManager configManager;
   private IConsensus consensusImpl;
 
+  private boolean isInitialized;
+
   public ConsensusManager(IManager configManager, ConfigRegionStateMachine 
stateMachine) {
     this.configManager = configManager;
     setConsensusLayer(stateMachine);
@@ -101,6 +103,7 @@ public class ConsensusManager {
             "Something wrong happened while calling consensus layer's 
createLocalPeer API.", e);
       }
     }
+    isInitialized = true;
   }
 
   public void close() throws IOException {
@@ -444,4 +447,8 @@ public class ConsensusManager {
   public void manuallyTakeSnapshot() throws ConsensusException {
     consensusImpl.triggerSnapshot(ConfigNodeInfo.CONFIG_REGION_ID, true);
   }
+
+  public boolean isInitialized() {
+    return isInitialized;
+  }
 }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
index 8baaec89247..643c8360e84 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
@@ -43,7 +43,8 @@ import java.util.List;
 public interface IConsensus {
 
   /**
-   * Start the consensus module.
+   * Start the consensus module. Note: You should call this function 
immediately after initializing
+   * the instance, because calling other functions without start may produce 
unexpected errors
    *
    * @throws IOException when start consensus errors
    */
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/DiskGuardian.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/DiskGuardian.java
index faa0f317d1a..ae6e597aafb 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/DiskGuardian.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/DiskGuardian.java
@@ -143,7 +143,7 @@ class DiskGuardian {
   void start() {
     // first schedule the snapshot daemon
     ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
-        workerThread, this::snapshotDaemon, 0L, daemonIntervalMs, 
TimeUnit.SECONDS);
+        workerThread, this::snapshotDaemon, daemonIntervalMs, 
daemonIntervalMs, TimeUnit.SECONDS);
 
     // then schedule all checker daemons
     snapshotArbitrators.forEach(
@@ -151,7 +151,7 @@ class DiskGuardian {
             ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
                 workerThread,
                 () -> checkerDaemon(checkers),
-                0L,
+                daemonIntervalMs,
                 interval.toLong(TimeUnit.SECONDS),
                 TimeUnit.SECONDS));
   }
@@ -175,26 +175,31 @@ class DiskGuardian {
     if (isStopped.get()) {
       return;
     }
-    for (RaftGroupId groupId : serverRef.get().getServer().getGroupIds()) {
-      if (getSnapshotFlag(groupId).get()) {
-        try {
-          
serverRef.get().triggerSnapshot(Utils.fromRaftGroupIdToConsensusGroupId(groupId),
 false);
-          final boolean flagCleared = 
snapshotFlag.get(groupId).compareAndSet(true, false);
-          if (!flagCleared) {
-            logger.warn(
-                "{}: clear snapshot flag failed for group {}, please check the 
related implementation",
+    try {
+      for (RaftGroupId groupId : serverRef.get().getServer().getGroupIds()) {
+        if (getSnapshotFlag(groupId).get()) {
+          try {
+            serverRef
+                .get()
+                
.triggerSnapshot(Utils.fromRaftGroupIdToConsensusGroupId(groupId), false);
+            final boolean flagCleared = 
snapshotFlag.get(groupId).compareAndSet(true, false);
+            if (!flagCleared) {
+              logger.info(
+                  "{}: clear snapshot flag failed for group {}, please check 
the related implementation",
+                  this,
+                  groupId);
+            }
+          } catch (ConsensusException e) {
+            logger.info(
+                "{} take snapshot failed for group {} due to {}. Disk file 
status {}",
                 this,
-                groupId);
+                groupId,
+                e,
+                getLatestSummary(groupId).orElse(null));
           }
-        } catch (ConsensusException e) {
-          logger.warn(
-              "{} take snapshot failed for group {} due to {}. Disk file 
status {}",
-              this,
-              groupId,
-              e,
-              getLatestSummary(groupId).orElse(null));
         }
       }
+    } catch (IOException ignore) {
     }
   }
 
@@ -203,18 +208,21 @@ class DiskGuardian {
     if (isStopped.get()) {
       return;
     }
-    for (RaftGroupId groupId : serverRef.get().getServer().getGroupIds()) {
-      final Optional<RaftLogSummary> summary = getLatestSummary(groupId);
-      if (summary.isPresent()) {
-        final Optional<Boolean> anyCheckerPositive =
-            checkerList.stream()
-                .map(checker -> checker.test(summary.get()))
-                .filter(Boolean::booleanValue)
-                .findAny();
-        if (anyCheckerPositive.isPresent()) {
-          getSnapshotFlag(groupId).set(true);
+    try {
+      for (RaftGroupId groupId : serverRef.get().getServer().getGroupIds()) {
+        final Optional<RaftLogSummary> summary = getLatestSummary(groupId);
+        if (summary.isPresent()) {
+          final Optional<Boolean> anyCheckerPositive =
+              checkerList.stream()
+                  .map(checker -> checker.test(summary.get()))
+                  .filter(Boolean::booleanValue)
+                  .findAny();
+          if (anyCheckerPositive.isPresent()) {
+            getSnapshotFlag(groupId).set(true);
+          }
         }
       }
+    } catch (IOException ignore) {
     }
   }
 
@@ -240,7 +248,6 @@ class DiskGuardian {
                         .getCurrentDir();
                 return new RaftLogSummary(gid, root);
               } catch (IOException e) {
-                logger.warn("{}: group not exists for {} and caught exception 
", this, groupId, e);
                 return null;
               }
             });
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index de646de2334..45c82332f55 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -83,6 +83,7 @@ import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException;
+import org.apache.ratis.util.MemoizedCheckedSupplier;
 import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.util.function.CheckedSupplier;
 import org.slf4j.Logger;
@@ -115,8 +116,7 @@ class RatisConsensus implements IConsensus {
   private final RaftPeer myself;
 
   private final File storageDir;
-
-  private final RaftServer server;
+  private final MemoizedCheckedSupplier<RaftServer, IOException> server;
 
   private final RaftProperties properties = new RaftProperties();
   private final RaftClientRpc clientRpc;
@@ -144,8 +144,7 @@ class RatisConsensus implements IConsensus {
 
   private final ConcurrentHashMap<ConsensusGroupId, AtomicBoolean> 
canServeStaleRead;
 
-  public RatisConsensus(ConsensusConfig config, IStateMachine.Registry 
registry)
-      throws IOException {
+  public RatisConsensus(ConsensusConfig config, IStateMachine.Registry 
registry) {
     myself =
         Utils.fromNodeInfoAndPriorityToRaftPeer(
             config.getThisNodeId(), config.getThisNodeEndPoint(), 
DEFAULT_PRIORITY);
@@ -195,24 +194,28 @@ class RatisConsensus implements IConsensus {
 
     clientRpc = new GrpcFactory(new 
Parameters()).newRaftClientRpc(ClientId.randomId(), properties);
 
+    // do not build server in constructor in case stateMachine is not ready
     server =
-        RaftServer.newBuilder()
-            .setServerId(myself.getId())
-            .setProperties(properties)
-            .setOption(RaftStorage.StartupOption.RECOVER)
-            .setStateMachineRegistry(
-                raftGroupId ->
-                    new ApplicationStateMachineProxy(
-                        
registry.apply(Utils.fromRaftGroupIdToConsensusGroupId(raftGroupId)),
-                        raftGroupId,
-                        this::onLeaderChanged))
-            .build();
+        MemoizedCheckedSupplier.valueOf(
+            () ->
+                RaftServer.newBuilder()
+                    .setServerId(myself.getId())
+                    .setProperties(properties)
+                    .setOption(RaftStorage.StartupOption.RECOVER)
+                    .setStateMachineRegistry(
+                        raftGroupId ->
+                            new ApplicationStateMachineProxy(
+                                registry.apply(
+                                    
Utils.fromRaftGroupIdToConsensusGroupId(raftGroupId)),
+                                raftGroupId,
+                                this::onLeaderChanged))
+                    .build());
   }
 
   @Override
   public synchronized void start() throws IOException {
     MetricService.getInstance().addMetricSet(this.ratisMetricSet);
-    server.start();
+    server.get().start();
     registerAndStartDiskGuardian();
   }
 
@@ -225,7 +228,7 @@ class RatisConsensus implements IConsensus {
       Thread.currentThread().interrupt();
     } finally {
       clientManager.close();
-      server.close();
+      server.get().close();
       MetricService.getInstance().removeMetricSet(this.ratisMetricSet);
     }
   }
@@ -252,7 +255,7 @@ class RatisConsensus implements IConsensus {
   }
 
   private RaftClientReply writeLocallyWithRetry(RaftClientRequest request) 
throws IOException {
-    return writeWithRetry(() -> server.submitClientRequest(request));
+    return writeWithRetry(() -> server.get().submitClientRequest(request));
   }
 
   private RaftClientReply writeRemotelyWithRetry(RatisClient client, Message 
message)
@@ -288,8 +291,12 @@ class RatisConsensus implements IConsensus {
     Message message = new RequestMessage(request);
 
     // 1. first try the local server
-    RaftClientRequest clientRequest =
-        buildRawRequest(raftGroupId, message, 
RaftClientRequest.writeRequestType());
+    RaftClientRequest clientRequest;
+    try {
+      clientRequest = buildRawRequest(raftGroupId, message, 
RaftClientRequest.writeRequestType());
+    } catch (IOException e) {
+      throw new RatisRequestFailedException(e);
+    }
 
     RaftPeer suggestedLeader = null;
     if ((isLeader(groupId) || raftGroup.getPeers().size() == 1)
@@ -392,7 +399,7 @@ class RatisConsensus implements IConsensus {
           Retriable.attempt(
               () -> {
                 try {
-                  return server.submitClientRequest(request);
+                  return server.get().submitClientRequest(request);
                 } catch (
                     IOException
                         ioe) { // IOE indicates some unexpected errors, say 
StatusRuntimeException
@@ -402,7 +409,7 @@ class RatisConsensus implements IConsensus {
                     // We can still retry in case it's a temporary network 
partition.
                     return RaftClientReply.newBuilder()
                         .setClientId(localFakeId)
-                        .setServerId(server.getId())
+                        .setServerId(server.get().getId())
                         .setGroupId(request.getRaftGroupId())
                         .setException(
                             new ReadIndexException(
@@ -440,9 +447,11 @@ class RatisConsensus implements IConsensus {
     RaftGroup group = buildRaftGroup(groupId, peers);
     try {
       RaftClientReply reply =
-          server.groupManagement(
-              GroupManagementRequest.newAdd(
-                  localFakeId, myself.getId(), 
localFakeCallId.incrementAndGet(), group, true));
+          server
+              .get()
+              .groupManagement(
+                  GroupManagementRequest.newAdd(
+                      localFakeId, myself.getId(), 
localFakeCallId.incrementAndGet(), group, true));
       if (!reply.isSuccess()) {
         throw new RatisRequestFailedException(reply.getException());
       }
@@ -468,14 +477,16 @@ class RatisConsensus implements IConsensus {
     RaftClientReply reply;
     try {
       reply =
-          server.groupManagement(
-              GroupManagementRequest.newRemove(
-                  localFakeId,
-                  myself.getId(),
-                  localFakeCallId.incrementAndGet(),
-                  raftGroupId,
-                  true,
-                  false));
+          server
+              .get()
+              .groupManagement(
+                  GroupManagementRequest.newRemove(
+                      localFakeId,
+                      myself.getId(),
+                      localFakeCallId.incrementAndGet(),
+                      raftGroupId,
+                      true,
+                      false));
       if (!reply.isSuccess()) {
         throw new RatisRequestFailedException(reply.getException());
       }
@@ -602,7 +613,7 @@ class RatisConsensus implements IConsensus {
   public boolean isLeader(ConsensusGroupId groupId) {
     RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
     try {
-      return server.getDivision(raftGroupId).getInfo().isLeader();
+      return server.get().getDivision(raftGroupId).getInfo().isLeader();
     } catch (IOException exception) {
       // if the read fails, simply return not leader
       logger.info("isLeader request failed with exception: ", exception);
@@ -614,7 +625,7 @@ class RatisConsensus implements IConsensus {
   public boolean isLeaderReady(ConsensusGroupId groupId) {
     RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
     try {
-      return server.getDivision(raftGroupId).getInfo().isLeaderReady();
+      return server.get().getDivision(raftGroupId).getInfo().isLeaderReady();
     } catch (IOException exception) {
       // if the read fails, simply return not ready
       logger.info("isLeaderReady request failed with exception: ", exception);
@@ -626,7 +637,7 @@ class RatisConsensus implements IConsensus {
   public long getLogicalClock(ConsensusGroupId groupId) {
     RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
     try {
-      return server.getDivision(raftGroupId).getInfo().getCurrentTerm();
+      return server.get().getDivision(raftGroupId).getInfo().getCurrentTerm();
     } catch (IOException exception) {
       // if the read fails, simply return 0
       logger.info("getLogicalClock request failed with exception: ", 
exception);
@@ -637,7 +648,7 @@ class RatisConsensus implements IConsensus {
   private boolean waitUntilLeaderReady(RaftGroupId groupId) {
     DivisionInfo divisionInfo;
     try {
-      divisionInfo = server.getDivision(groupId).getInfo();
+      divisionInfo = server.get().getDivision(groupId).getInfo();
     } catch (IOException e) {
       // if the read fails, simply return not leader
       logger.info("isLeaderReady checking failed with exception: ", e);
@@ -685,7 +696,7 @@ class RatisConsensus implements IConsensus {
     RaftPeerId leaderId;
 
     try {
-      leaderId = server.getDivision(raftGroupId).getInfo().getLeaderId();
+      leaderId = server.get().getDivision(raftGroupId).getInfo().getLeaderId();
     } catch (IOException e) {
       logger.warn("fetch division info for group " + groupId + " failed due 
to: ", e);
       return null;
@@ -700,10 +711,15 @@ class RatisConsensus implements IConsensus {
   @Override
   public List<ConsensusGroupId> getAllConsensusGroupIds() {
     List<ConsensusGroupId> ids = new ArrayList<>();
-    server
-        .getGroupIds()
-        .forEach(groupId -> 
ids.add(Utils.fromRaftGroupIdToConsensusGroupId(groupId)));
-    return ids;
+    try {
+      server
+          .get()
+          .getGroupIds()
+          .forEach(groupId -> 
ids.add(Utils.fromRaftGroupIdToConsensusGroupId(groupId)));
+      return ids;
+    } catch (IOException e) {
+      return Collections.emptyList();
+    }
   }
 
   @Override
@@ -757,7 +773,7 @@ class RatisConsensus implements IConsensus {
 
     final RaftClientReply reply;
     try {
-      reply = server.snapshotManagement(request);
+      reply = server.get().snapshotManagement(request);
       if (!reply.isSuccess()) {
         throw new RatisRequestFailedException(reply.getException());
       }
@@ -785,9 +801,9 @@ class RatisConsensus implements IConsensus {
   }
 
   private RaftClientRequest buildRawRequest(
-      RaftGroupId groupId, Message message, RaftClientRequest.Type type) {
+      RaftGroupId groupId, Message message, RaftClientRequest.Type type) 
throws IOException {
     return RaftClientRequest.newBuilder()
-        .setServerId(server.getId())
+        .setServerId(server.get().getId())
         .setClientId(localFakeId)
         .setCallId(localFakeCallId.incrementAndGet())
         .setGroupId(groupId)
@@ -799,7 +815,7 @@ class RatisConsensus implements IConsensus {
   private RaftGroup getGroupInfo(RaftGroupId raftGroupId) {
     RaftGroup raftGroup = null;
     try {
-      raftGroup = server.getDivision(raftGroupId).getGroup();
+      raftGroup = server.get().getDivision(raftGroupId).getGroup();
       RaftGroup lastSeenGroup = lastSeen.getOrDefault(raftGroupId, null);
       if (lastSeenGroup != null && !lastSeenGroup.equals(raftGroup)) {
         // delete the pooled raft-client of the out-dated group and cache the 
latest
@@ -854,9 +870,8 @@ class RatisConsensus implements IConsensus {
             });
   }
 
-  @TestOnly
-  public RaftServer getServer() {
-    return server;
+  public RaftServer getServer() throws IOException {
+    return server.get();
   }
 
   @TestOnly

Reply via email to