This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
new 7455e0002c8 RegionMigrate: Fix migrating region with ratisConsensus
cause the region is unavailable. (#13178) (#13230)
7455e0002c8 is described below
commit 7455e0002c822fe72c9b5073274db4e7158857df
Author: 133tosakarin <[email protected]>
AuthorDate: Tue Aug 20 10:37:53 2024 +0800
RegionMigrate: Fix migrating region with ratisConsensus cause the region is
unavailable. (#13178) (#13230)
---
.../procedure/env/RegionMaintainHandler.java | 49 +++++++++-----
.../impl/region/RemoveRegionPeerProcedure.java | 23 ++++---
.../apache/iotdb/consensus/ratis/RatisClient.java | 78 ++++++++++++++++++++++
.../iotdb/consensus/ratis/RatisConsensus.java | 33 +++++++--
4 files changed, 152 insertions(+), 31 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
index 4b51208e79f..490be8be342 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
@@ -665,23 +665,19 @@ public class RegionMaintainHandler {
* @param regionId The region to be migrated
* @param originalDataNode The DataNode where the region locates
*/
- public void transferRegionLeader(TConsensusGroupId regionId,
TDataNodeLocation originalDataNode)
+ public void transferRegionLeader(
+ TConsensusGroupId regionId, TDataNodeLocation originalDataNode,
TDataNodeLocation coodinator)
throws ProcedureException, InterruptedException {
// find new leader
- final int findNewLeaderTimeLimitSecond = 10;
- long startTime = System.nanoTime();
Optional<TDataNodeLocation> newLeaderNode = Optional.empty();
- while (System.nanoTime() - startTime <
TimeUnit.SECONDS.toNanos(findNewLeaderTimeLimitSecond)) {
- newLeaderNode = filterDataNodeWithOtherRegionReplica(regionId,
originalDataNode);
- if (newLeaderNode.isPresent()) {
- break;
- }
+ List<TDataNodeLocation> excludeDataNode = new ArrayList<>();
+ excludeDataNode.add(originalDataNode);
+ excludeDataNode.add(coodinator);
+ newLeaderNode = filterDataNodeWithOtherRegionReplica(regionId,
excludeDataNode);
+ if (!newLeaderNode.isPresent()) {
+ // If we have no choice, we use it
+ newLeaderNode = Optional.of(coodinator);
}
- newLeaderNode.orElseThrow(
- () ->
- new ProcedureException(
- "Cannot find the new leader after " +
findNewLeaderTimeLimitSecond + " seconds"));
-
// ratis needs DataNode to do election by itself
long timestamp = System.nanoTime();
if (TConsensusGroupType.SchemaRegion.equals(regionId.getType())
@@ -693,6 +689,14 @@ public class RegionMaintainHandler {
(CONF.getSchemaRegionRatisRpcLeaderElectionTimeoutMaxMs()
+ CONF.getSchemaRegionRatisRpcLeaderElectionTimeoutMinMs())
/ 2;
+ Integer leaderId =
configManager.getLoadManager().getRegionLeaderMap().get(regionId);
+
+ if (leaderId != -1) {
+ // The migrated node is not leader, so we don't need to transfer
temporarily
+ if (originalDataNode.getDataNodeId() != leaderId) {
+ return;
+ }
+ }
while (true) {
TRegionLeaderChangeResp resp =
SyncDataNodeClientPool.getInstance()
@@ -742,12 +746,26 @@ public class RegionMaintainHandler {
*/
public Optional<TDataNodeLocation> filterDataNodeWithOtherRegionReplica(
TConsensusGroupId regionId, TDataNodeLocation filterLocation) {
+ List<TDataNodeLocation> filterLocations =
Collections.singletonList(filterLocation);
+ return filterDataNodeWithOtherRegionReplica(regionId, filterLocations);
+ }
+
+ public Optional<TDataNodeLocation> filterDataNodeWithOtherRegionReplica(
+ TConsensusGroupId regionId, List<TDataNodeLocation> filterLocations) {
return filterDataNodeWithOtherRegionReplica(
- regionId, filterLocation, NodeStatus.Running, NodeStatus.ReadOnly);
+ regionId, filterLocations, NodeStatus.Running, NodeStatus.ReadOnly);
}
public Optional<TDataNodeLocation> filterDataNodeWithOtherRegionReplica(
TConsensusGroupId regionId, TDataNodeLocation filterLocation,
NodeStatus... allowingStatus) {
+ List<TDataNodeLocation> excludeLocations =
Collections.singletonList(filterLocation);
+ return filterDataNodeWithOtherRegionReplica(regionId, excludeLocations,
allowingStatus);
+ }
+
+ public Optional<TDataNodeLocation> filterDataNodeWithOtherRegionReplica(
+ TConsensusGroupId regionId,
+ List<TDataNodeLocation> excludeLocations,
+ NodeStatus... allowingStatus) {
List<TDataNodeLocation> regionLocations = findRegionLocations(regionId);
if (regionLocations.isEmpty()) {
LOGGER.warn("Cannot find DataNodes contain the given region: {}",
regionId);
@@ -762,11 +780,10 @@ public class RegionMaintainHandler {
.collect(Collectors.toList());
Collections.shuffle(aliveDataNodes);
for (TDataNodeLocation aliveDataNode : aliveDataNodes) {
- if (regionLocations.contains(aliveDataNode) &&
!aliveDataNode.equals(filterLocation)) {
+ if (regionLocations.contains(aliveDataNode) &&
!excludeLocations.contains(aliveDataNode)) {
return Optional.of(aliveDataNode);
}
}
-
return Optional.empty();
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
index 166714e90fc..463492357d8 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
-import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.env.RegionMaintainHandler;
@@ -68,12 +67,22 @@ public class RemoveRegionPeerProcedure
TConsensusGroupId consensusGroupId,
TDataNodeLocation coordinator,
TDataNodeLocation targetDataNode) {
- super();
this.consensusGroupId = consensusGroupId;
this.coordinator = coordinator;
this.targetDataNode = targetDataNode;
}
+ private void handleTransferLeader(RegionMaintainHandler handler)
+ throws ProcedureException, InterruptedException {
+ LOGGER.info(
+ "[pid{}][RemoveRegion] started, region {} will be removed from
DataNode {}.",
+ getProcId(),
+ consensusGroupId.getId(),
+ targetDataNode.getDataNodeId());
+ handler.forceUpdateRegionCache(consensusGroupId, targetDataNode,
RegionStatus.Removing);
+ handler.transferRegionLeader(consensusGroupId, targetDataNode,
coordinator);
+ }
+
@Override
protected Flow executeFromState(ConfigNodeProcedureEnv env,
RemoveRegionPeerState state)
throws ProcedureSuspendedException, ProcedureYieldException,
InterruptedException {
@@ -85,14 +94,8 @@ public class RemoveRegionPeerProcedure
try {
switch (state) {
case TRANSFER_REGION_LEADER:
- LOGGER.info(
- "[pid{}][RemoveRegion] started, region {} will be removed from
DataNode {}.",
- getProcId(),
- consensusGroupId.getId(),
- targetDataNode.getDataNodeId());
- handler.forceUpdateRegionCache(consensusGroupId, targetDataNode,
RegionStatus.Removing);
- handler.transferRegionLeader(consensusGroupId, targetDataNode);
- KillPoint.setKillPoint(state);
+ handleTransferLeader(handler);
+ setKillPoint(state);
setNextState(REMOVE_REGION_PEER);
break;
case REMOVE_REGION_PEER:
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClient.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClient.java
index 21f3f2881a2..7c03ddf042e 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClient.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClient.java
@@ -29,8 +29,16 @@ import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientRpc;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
+import org.apache.ratis.protocol.exceptions.LeaderSteppingDownException;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.protocol.exceptions.RaftException;
+import org.apache.ratis.protocol.exceptions.ReconfigurationInProgressException;
+import org.apache.ratis.protocol.exceptions.ReconfigurationTimeoutException;
+import org.apache.ratis.protocol.exceptions.ServerNotReadyException;
+import org.apache.ratis.protocol.exceptions.TimeoutIOException;
import org.apache.ratis.retry.ExponentialBackoffRetry;
+import org.apache.ratis.retry.RetryPolicies;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException;
import org.apache.ratis.util.TimeDuration;
@@ -120,6 +128,48 @@ class RatisClient implements AutoCloseable {
}
}
+ static class EndlessRetryFactory extends BaseClientFactory<RaftGroup,
RatisClient> {
+
+ private final RaftProperties raftProperties;
+ private final RaftClientRpc clientRpc;
+ private final RatisConfig.Client config;
+
+ public EndlessRetryFactory(
+ ClientManager<RaftGroup, RatisClient> clientManager,
+ RaftProperties raftProperties,
+ RaftClientRpc clientRpc,
+ RatisConfig.Client config) {
+ super(clientManager);
+ this.raftProperties = raftProperties;
+ this.clientRpc = clientRpc;
+ this.config = config;
+ }
+
+ @Override
+ public void destroyObject(RaftGroup key, PooledObject<RatisClient>
pooledObject) {
+ pooledObject.getObject().invalidate();
+ }
+
+ @Override
+ public PooledObject<RatisClient> makeObject(RaftGroup group) {
+ return new DefaultPooledObject<>(
+ new RatisClient(
+ group,
+ RaftClient.newBuilder()
+ .setProperties(raftProperties)
+ .setRaftGroup(group)
+ .setRetryPolicy(new RatisEndlessRetryPolicy())
+ .setClientRpc(clientRpc)
+ .build(),
+ clientManager));
+ }
+
+ @Override
+ public boolean validateObject(RaftGroup key, PooledObject<RatisClient>
pooledObject) {
+ return true;
+ }
+ }
+
/**
* RatisRetryPolicy is similar to ExceptionDependentRetry 1. By default, use
* ExponentialBackoffRetry to handle request failure 2. If unexpected
IOException is caught,
@@ -167,4 +217,32 @@ class RatisClient implements AutoCloseable {
return defaultPolicy.handleAttemptFailure(event);
}
}
+
+ /** This policy is used to raft configuration change */
+ private static class RatisEndlessRetryPolicy implements RetryPolicy {
+
+ private static final Logger logger =
LoggerFactory.getLogger(RatisEndlessRetryPolicy.class);
+ private final RetryPolicy defaultPolicy;
+
+ RatisEndlessRetryPolicy() {
+ defaultPolicy =
+ RetryPolicies.retryForeverWithSleep(TimeDuration.valueOf(2,
TimeUnit.SECONDS));
+ }
+
+ @Override
+ public Action handleAttemptFailure(Event event) {
+ if (event.getCause() == null
+ || event.getCause() instanceof ReconfigurationInProgressException
+ || event.getCause() instanceof TimeoutIOException
+ || event.getCause() instanceof LeaderSteppingDownException
+ || event.getCause() instanceof ReconfigurationTimeoutException
+ || event.getCause() instanceof ServerNotReadyException
+ || event.getCause() instanceof NotLeaderException
+ || event.getCause() instanceof LeaderNotReadyException) {
+ return defaultPolicy.handleAttemptFailure(event);
+ }
+
+ return defaultPolicy.handleAttemptFailure(event);
+ }
+ }
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index d4bb8d55c8a..f346d90ac35 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -122,6 +122,7 @@ class RatisConsensus implements IConsensus {
private final RaftClientRpc clientRpc;
private final IClientManager<RaftGroup, RatisClient> clientManager;
+ private final IClientManager<RaftGroup, RatisClient>
reconfigurationClientManager;
private final DiskGuardian diskGuardian;
@@ -190,7 +191,10 @@ class RatisConsensus implements IConsensus {
clientManager =
new IClientManager.Factory<RaftGroup, RatisClient>()
- .createClientManager(new RatisClientPoolFactory());
+ .createClientManager(new RatisClientPoolFactory(false));
+ reconfigurationClientManager =
+ new IClientManager.Factory<RaftGroup, RatisClient>()
+ .createClientManager(new RatisClientPoolFactory(true));
clientRpc = new GrpcFactory(new
Parameters()).newRaftClientRpc(ClientId.randomId(), properties);
@@ -228,6 +232,7 @@ class RatisConsensus implements IConsensus {
Thread.currentThread().interrupt();
} finally {
clientManager.close();
+ reconfigurationClientManager.close();
server.get().close();
MetricService.getInstance().removeMetricSet(this.ratisMetricSet);
}
@@ -581,9 +586,7 @@ class RatisConsensus implements IConsensus {
final RaftGroup raftGroup =
Optional.ofNullable(getGroupInfo(raftGroupId))
.orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
-
final RaftPeer newRaftLeader =
Utils.fromPeerAndPriorityToRaftPeer(newLeader, DEFAULT_PRIORITY);
-
final RaftClientReply reply;
try {
reply = transferLeader(raftGroup, newRaftLeader);
@@ -839,6 +842,7 @@ class RatisConsensus implements IConsensus {
if (lastSeenGroup != null && !lastSeenGroup.equals(raftGroup)) {
// delete the pooled raft-client of the out-dated group and cache the
latest
clientManager.clear(lastSeenGroup);
+ reconfigurationClientManager.clear(lastSeenGroup);
lastSeen.put(raftGroupId, raftGroup);
}
} catch (IOException e) {
@@ -863,11 +867,21 @@ class RatisConsensus implements IConsensus {
}
}
+ private RatisClient getConfigurationRaftClient(RaftGroup group) throws
ClientManagerException {
+ try {
+ return reconfigurationClientManager.borrowClient(group);
+ } catch (ClientManagerException e) {
+ logger.error("Borrow client from pool for group {} failed.", group, e);
+ // rethrow the exception
+ throw e;
+ }
+ }
+
private RaftClientReply sendReconfiguration(RaftGroup newGroupConf)
throws RatisRequestFailedException {
// notify the group leader of configuration change
RaftClientReply reply;
- try (RatisClient client = getRaftClient(newGroupConf)) {
+ try (RatisClient client = getConfigurationRaftClient(newGroupConf)) {
reply =
client.getRaftClient().admin().setConfiguration(new
ArrayList<>(newGroupConf.getPeers()));
if (!reply.isSuccess()) {
@@ -900,12 +914,21 @@ class RatisConsensus implements IConsensus {
private class RatisClientPoolFactory implements
IClientPoolFactory<RaftGroup, RatisClient> {
+ private final boolean isReconfiguration;
+
+ RatisClientPoolFactory(boolean isReconfiguration) {
+ this.isReconfiguration = isReconfiguration;
+ }
+
@Override
public KeyedObjectPool<RaftGroup, RatisClient> createClientPool(
ClientManager<RaftGroup, RatisClient> manager) {
GenericKeyedObjectPool<RaftGroup, RatisClient> clientPool =
new GenericKeyedObjectPool<>(
- new RatisClient.Factory(manager, properties, clientRpc,
config.getClient()),
+ isReconfiguration
+ ? new RatisClient.EndlessRetryFactory(
+ manager, properties, clientRpc, config.getClient())
+ : new RatisClient.Factory(manager, properties, clientRpc,
config.getClient()),
new ClientPoolProperty.Builder<RatisClient>()
.setMaxClientNumForEachNode(config.getClient().getMaxClientNumForEachNode())
.build()