This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
     new 7455e0002c8 RegionMigrate: Fix migrating region with ratisConsensus 
cause the region is unavailable. (#13178) (#13230)
7455e0002c8 is described below

commit 7455e0002c822fe72c9b5073274db4e7158857df
Author: 133tosakarin <[email protected]>
AuthorDate: Tue Aug 20 10:37:53 2024 +0800

    RegionMigrate: Fix migrating region with ratisConsensus cause the region is 
unavailable. (#13178) (#13230)
---
 .../procedure/env/RegionMaintainHandler.java       | 49 +++++++++-----
 .../impl/region/RemoveRegionPeerProcedure.java     | 23 ++++---
 .../apache/iotdb/consensus/ratis/RatisClient.java  | 78 ++++++++++++++++++++++
 .../iotdb/consensus/ratis/RatisConsensus.java      | 33 +++++++--
 4 files changed, 152 insertions(+), 31 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
index 4b51208e79f..490be8be342 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
@@ -665,23 +665,19 @@ public class RegionMaintainHandler {
    * @param regionId The region to be migrated
    * @param originalDataNode The DataNode where the region locates
    */
-  public void transferRegionLeader(TConsensusGroupId regionId, 
TDataNodeLocation originalDataNode)
+  public void transferRegionLeader(
+      TConsensusGroupId regionId, TDataNodeLocation originalDataNode, 
TDataNodeLocation coodinator)
       throws ProcedureException, InterruptedException {
     // find new leader
-    final int findNewLeaderTimeLimitSecond = 10;
-    long startTime = System.nanoTime();
     Optional<TDataNodeLocation> newLeaderNode = Optional.empty();
-    while (System.nanoTime() - startTime < 
TimeUnit.SECONDS.toNanos(findNewLeaderTimeLimitSecond)) {
-      newLeaderNode = filterDataNodeWithOtherRegionReplica(regionId, 
originalDataNode);
-      if (newLeaderNode.isPresent()) {
-        break;
-      }
+    List<TDataNodeLocation> excludeDataNode = new ArrayList<>();
+    excludeDataNode.add(originalDataNode);
+    excludeDataNode.add(coodinator);
+    newLeaderNode = filterDataNodeWithOtherRegionReplica(regionId, 
excludeDataNode);
+    if (!newLeaderNode.isPresent()) {
+      // If we have no choice, we use it
+      newLeaderNode = Optional.of(coodinator);
     }
-    newLeaderNode.orElseThrow(
-        () ->
-            new ProcedureException(
-                "Cannot find the new leader after " + 
findNewLeaderTimeLimitSecond + " seconds"));
-
     // ratis needs DataNode to do election by itself
     long timestamp = System.nanoTime();
     if (TConsensusGroupType.SchemaRegion.equals(regionId.getType())
@@ -693,6 +689,14 @@ public class RegionMaintainHandler {
           (CONF.getSchemaRegionRatisRpcLeaderElectionTimeoutMaxMs()
                   + CONF.getSchemaRegionRatisRpcLeaderElectionTimeoutMinMs())
               / 2;
+      Integer leaderId = 
configManager.getLoadManager().getRegionLeaderMap().get(regionId);
+
+      if (leaderId != -1) {
+        // The migrated node is not leader, so we don't need to transfer 
temporarily
+        if (originalDataNode.getDataNodeId() != leaderId) {
+          return;
+        }
+      }
       while (true) {
         TRegionLeaderChangeResp resp =
             SyncDataNodeClientPool.getInstance()
@@ -742,12 +746,26 @@ public class RegionMaintainHandler {
    */
   public Optional<TDataNodeLocation> filterDataNodeWithOtherRegionReplica(
       TConsensusGroupId regionId, TDataNodeLocation filterLocation) {
+    List<TDataNodeLocation> filterLocations = 
Collections.singletonList(filterLocation);
+    return filterDataNodeWithOtherRegionReplica(regionId, filterLocations);
+  }
+
+  public Optional<TDataNodeLocation> filterDataNodeWithOtherRegionReplica(
+      TConsensusGroupId regionId, List<TDataNodeLocation> filterLocations) {
     return filterDataNodeWithOtherRegionReplica(
-        regionId, filterLocation, NodeStatus.Running, NodeStatus.ReadOnly);
+        regionId, filterLocations, NodeStatus.Running, NodeStatus.ReadOnly);
   }
 
   public Optional<TDataNodeLocation> filterDataNodeWithOtherRegionReplica(
       TConsensusGroupId regionId, TDataNodeLocation filterLocation, 
NodeStatus... allowingStatus) {
+    List<TDataNodeLocation> excludeLocations = 
Collections.singletonList(filterLocation);
+    return filterDataNodeWithOtherRegionReplica(regionId, excludeLocations, 
allowingStatus);
+  }
+
+  public Optional<TDataNodeLocation> filterDataNodeWithOtherRegionReplica(
+      TConsensusGroupId regionId,
+      List<TDataNodeLocation> excludeLocations,
+      NodeStatus... allowingStatus) {
     List<TDataNodeLocation> regionLocations = findRegionLocations(regionId);
     if (regionLocations.isEmpty()) {
       LOGGER.warn("Cannot find DataNodes contain the given region: {}", 
regionId);
@@ -762,11 +780,10 @@ public class RegionMaintainHandler {
             .collect(Collectors.toList());
     Collections.shuffle(aliveDataNodes);
     for (TDataNodeLocation aliveDataNode : aliveDataNodes) {
-      if (regionLocations.contains(aliveDataNode) && 
!aliveDataNode.equals(filterLocation)) {
+      if (regionLocations.contains(aliveDataNode) && 
!excludeLocations.contains(aliveDataNode)) {
         return Optional.of(aliveDataNode);
       }
     }
-
     return Optional.empty();
   }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
index 166714e90fc..463492357d8 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.cluster.RegionStatus;
 import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
 import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
-import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
 import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.env.RegionMaintainHandler;
@@ -68,12 +67,22 @@ public class RemoveRegionPeerProcedure
       TConsensusGroupId consensusGroupId,
       TDataNodeLocation coordinator,
       TDataNodeLocation targetDataNode) {
-    super();
     this.consensusGroupId = consensusGroupId;
     this.coordinator = coordinator;
     this.targetDataNode = targetDataNode;
   }
 
+  private void handleTransferLeader(RegionMaintainHandler handler)
+      throws ProcedureException, InterruptedException {
+    LOGGER.info(
+        "[pid{}][RemoveRegion] started, region {} will be removed from 
DataNode {}.",
+        getProcId(),
+        consensusGroupId.getId(),
+        targetDataNode.getDataNodeId());
+    handler.forceUpdateRegionCache(consensusGroupId, targetDataNode, 
RegionStatus.Removing);
+    handler.transferRegionLeader(consensusGroupId, targetDataNode, 
coordinator);
+  }
+
   @Override
   protected Flow executeFromState(ConfigNodeProcedureEnv env, 
RemoveRegionPeerState state)
       throws ProcedureSuspendedException, ProcedureYieldException, 
InterruptedException {
@@ -85,14 +94,8 @@ public class RemoveRegionPeerProcedure
     try {
       switch (state) {
         case TRANSFER_REGION_LEADER:
-          LOGGER.info(
-              "[pid{}][RemoveRegion] started, region {} will be removed from 
DataNode {}.",
-              getProcId(),
-              consensusGroupId.getId(),
-              targetDataNode.getDataNodeId());
-          handler.forceUpdateRegionCache(consensusGroupId, targetDataNode, 
RegionStatus.Removing);
-          handler.transferRegionLeader(consensusGroupId, targetDataNode);
-          KillPoint.setKillPoint(state);
+          handleTransferLeader(handler);
+          setKillPoint(state);
           setNextState(REMOVE_REGION_PEER);
           break;
         case REMOVE_REGION_PEER:
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClient.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClient.java
index 21f3f2881a2..7c03ddf042e 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClient.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClient.java
@@ -29,8 +29,16 @@ import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.RaftClientRpc;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
+import org.apache.ratis.protocol.exceptions.LeaderSteppingDownException;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.apache.ratis.protocol.exceptions.RaftException;
+import org.apache.ratis.protocol.exceptions.ReconfigurationInProgressException;
+import org.apache.ratis.protocol.exceptions.ReconfigurationTimeoutException;
+import org.apache.ratis.protocol.exceptions.ServerNotReadyException;
+import org.apache.ratis.protocol.exceptions.TimeoutIOException;
 import org.apache.ratis.retry.ExponentialBackoffRetry;
+import org.apache.ratis.retry.RetryPolicies;
 import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException;
 import org.apache.ratis.util.TimeDuration;
@@ -120,6 +128,48 @@ class RatisClient implements AutoCloseable {
     }
   }
 
+  static class EndlessRetryFactory extends BaseClientFactory<RaftGroup, 
RatisClient> {
+
+    private final RaftProperties raftProperties;
+    private final RaftClientRpc clientRpc;
+    private final RatisConfig.Client config;
+
+    public EndlessRetryFactory(
+        ClientManager<RaftGroup, RatisClient> clientManager,
+        RaftProperties raftProperties,
+        RaftClientRpc clientRpc,
+        RatisConfig.Client config) {
+      super(clientManager);
+      this.raftProperties = raftProperties;
+      this.clientRpc = clientRpc;
+      this.config = config;
+    }
+
+    @Override
+    public void destroyObject(RaftGroup key, PooledObject<RatisClient> 
pooledObject) {
+      pooledObject.getObject().invalidate();
+    }
+
+    @Override
+    public PooledObject<RatisClient> makeObject(RaftGroup group) {
+      return new DefaultPooledObject<>(
+          new RatisClient(
+              group,
+              RaftClient.newBuilder()
+                  .setProperties(raftProperties)
+                  .setRaftGroup(group)
+                  .setRetryPolicy(new RatisEndlessRetryPolicy())
+                  .setClientRpc(clientRpc)
+                  .build(),
+              clientManager));
+    }
+
+    @Override
+    public boolean validateObject(RaftGroup key, PooledObject<RatisClient> 
pooledObject) {
+      return true;
+    }
+  }
+
   /**
    * RatisRetryPolicy is similar to ExceptionDependentRetry 1. By default, use
    * ExponentialBackoffRetry to handle request failure 2. If unexpected 
IOException is caught,
@@ -167,4 +217,32 @@ class RatisClient implements AutoCloseable {
       return defaultPolicy.handleAttemptFailure(event);
     }
   }
+
+  /** This policy is used to raft configuration change */
+  private static class RatisEndlessRetryPolicy implements RetryPolicy {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(RatisEndlessRetryPolicy.class);
+    private final RetryPolicy defaultPolicy;
+
+    RatisEndlessRetryPolicy() {
+      defaultPolicy =
+          RetryPolicies.retryForeverWithSleep(TimeDuration.valueOf(2, 
TimeUnit.SECONDS));
+    }
+
+    @Override
+    public Action handleAttemptFailure(Event event) {
+      if (event.getCause() == null
+          || event.getCause() instanceof ReconfigurationInProgressException
+          || event.getCause() instanceof TimeoutIOException
+          || event.getCause() instanceof LeaderSteppingDownException
+          || event.getCause() instanceof ReconfigurationTimeoutException
+          || event.getCause() instanceof ServerNotReadyException
+          || event.getCause() instanceof NotLeaderException
+          || event.getCause() instanceof LeaderNotReadyException) {
+        return defaultPolicy.handleAttemptFailure(event);
+      }
+
+      return defaultPolicy.handleAttemptFailure(event);
+    }
+  }
 }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index d4bb8d55c8a..f346d90ac35 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -122,6 +122,7 @@ class RatisConsensus implements IConsensus {
   private final RaftClientRpc clientRpc;
 
   private final IClientManager<RaftGroup, RatisClient> clientManager;
+  private final IClientManager<RaftGroup, RatisClient> 
reconfigurationClientManager;
 
   private final DiskGuardian diskGuardian;
 
@@ -190,7 +191,10 @@ class RatisConsensus implements IConsensus {
 
     clientManager =
         new IClientManager.Factory<RaftGroup, RatisClient>()
-            .createClientManager(new RatisClientPoolFactory());
+            .createClientManager(new RatisClientPoolFactory(false));
+    reconfigurationClientManager =
+        new IClientManager.Factory<RaftGroup, RatisClient>()
+            .createClientManager(new RatisClientPoolFactory(true));
 
     clientRpc = new GrpcFactory(new 
Parameters()).newRaftClientRpc(ClientId.randomId(), properties);
 
@@ -228,6 +232,7 @@ class RatisConsensus implements IConsensus {
       Thread.currentThread().interrupt();
     } finally {
       clientManager.close();
+      reconfigurationClientManager.close();
       server.get().close();
       MetricService.getInstance().removeMetricSet(this.ratisMetricSet);
     }
@@ -581,9 +586,7 @@ class RatisConsensus implements IConsensus {
     final RaftGroup raftGroup =
         Optional.ofNullable(getGroupInfo(raftGroupId))
             .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
-
     final RaftPeer newRaftLeader = 
Utils.fromPeerAndPriorityToRaftPeer(newLeader, DEFAULT_PRIORITY);
-
     final RaftClientReply reply;
     try {
       reply = transferLeader(raftGroup, newRaftLeader);
@@ -839,6 +842,7 @@ class RatisConsensus implements IConsensus {
       if (lastSeenGroup != null && !lastSeenGroup.equals(raftGroup)) {
         // delete the pooled raft-client of the out-dated group and cache the 
latest
         clientManager.clear(lastSeenGroup);
+        reconfigurationClientManager.clear(lastSeenGroup);
         lastSeen.put(raftGroupId, raftGroup);
       }
     } catch (IOException e) {
@@ -863,11 +867,21 @@ class RatisConsensus implements IConsensus {
     }
   }
 
+  private RatisClient getConfigurationRaftClient(RaftGroup group) throws 
ClientManagerException {
+    try {
+      return reconfigurationClientManager.borrowClient(group);
+    } catch (ClientManagerException e) {
+      logger.error("Borrow client from pool for group {} failed.", group, e);
+      // rethrow the exception
+      throw e;
+    }
+  }
+
   private RaftClientReply sendReconfiguration(RaftGroup newGroupConf)
       throws RatisRequestFailedException {
     // notify the group leader of configuration change
     RaftClientReply reply;
-    try (RatisClient client = getRaftClient(newGroupConf)) {
+    try (RatisClient client = getConfigurationRaftClient(newGroupConf)) {
       reply =
           client.getRaftClient().admin().setConfiguration(new 
ArrayList<>(newGroupConf.getPeers()));
       if (!reply.isSuccess()) {
@@ -900,12 +914,21 @@ class RatisConsensus implements IConsensus {
 
   private class RatisClientPoolFactory implements 
IClientPoolFactory<RaftGroup, RatisClient> {
 
+    private final boolean isReconfiguration;
+
+    RatisClientPoolFactory(boolean isReconfiguration) {
+      this.isReconfiguration = isReconfiguration;
+    }
+
     @Override
     public KeyedObjectPool<RaftGroup, RatisClient> createClientPool(
         ClientManager<RaftGroup, RatisClient> manager) {
       GenericKeyedObjectPool<RaftGroup, RatisClient> clientPool =
           new GenericKeyedObjectPool<>(
-              new RatisClient.Factory(manager, properties, clientRpc, 
config.getClient()),
+              isReconfiguration
+                  ? new RatisClient.EndlessRetryFactory(
+                      manager, properties, clientRpc, config.getClient())
+                  : new RatisClient.Factory(manager, properties, clientRpc, 
config.getClient()),
               new ClientPoolProperty.Builder<RatisClient>()
                   
.setMaxClientNumForEachNode(config.getClient().getMaxClientNumForEachNode())
                   .build()

Reply via email to