This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c310cb0c3eb IoTV2: Fix consensus pipe operation time out (#14399)
c310cb0c3eb is described below
commit c310cb0c3eb61cbc53168a3150d569c0ac5ee7a5
Author: Peng Junzhi <[email protected]>
AuthorDate: Fri Dec 13 12:40:53 2024 +0800
IoTV2: Fix consensus pipe operation time out (#14399)
* fix consensus pipe operation time out
* add remove
---
.../client/sync/SyncDataNodeClientPool.java | 2 +-
.../apache/iotdb/consensus/pipe/PipeConsensus.java | 28 ++++++++++++++++------
.../thrift/impl/DataNodeRegionManager.java | 4 ++--
3 files changed, 24 insertions(+), 10 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
index 2999bbdec58..9a063900c0e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
@@ -59,7 +59,7 @@ public class SyncDataNodeClientPool {
private static final Logger LOGGER =
LoggerFactory.getLogger(SyncDataNodeClientPool.class);
- private static final int DEFAULT_RETRY_NUM = 6;
+ private static final int DEFAULT_RETRY_NUM = 10;
private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
clientManager;
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
index e7a02236f7d..ce5acf73cc3 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
@@ -74,7 +74,9 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import static
org.apache.iotdb.consensus.iot.IoTConsensus.getConsensusGroupIdsFromDir;
@@ -92,7 +94,9 @@ public class PipeConsensus implements IConsensus {
new ConcurrentHashMap<>();
private final PipeConsensusRPCService rpcService;
private final RegisterManager registerManager = new RegisterManager();
- private final ReentrantLock stateMachineMapLock = new ReentrantLock();
+ private final Map<ConsensusGroupId, ReentrantLock>
consensusGroupIdReentrantLockMap =
+ new ConcurrentHashMap<>();
+ private final ReentrantReadWriteLock stateMachineMapLock = new
ReentrantReadWriteLock();
private final PipeConsensusConfig config;
private final ConsensusPipeManager consensusPipeManager;
private final ConsensusPipeGuardian consensusPipeGuardian;
@@ -190,7 +194,7 @@ public class PipeConsensus implements IConsensus {
entry -> entry.getKey().getConsensusGroupId(),
Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
try {
- stateMachineMapLock.lock();
+ stateMachineMapLock.writeLock().lock();
stateMachineMap.forEach(
(key, value) ->
value.checkConsensusPipe(existedPipes.getOrDefault(key,
ImmutableMap.of())));
@@ -214,7 +218,7 @@ public class PipeConsensus implements IConsensus {
}
});
} finally {
- stateMachineMapLock.unlock();
+ stateMachineMapLock.writeLock().unlock();
}
}
@@ -263,8 +267,11 @@ public class PipeConsensus implements IConsensus {
throw new IllegalPeerEndpointException(thisNode, peers);
}
+ Lock lock =
+ consensusGroupIdReentrantLockMap.computeIfAbsent(groupId, key -> new
ReentrantLock());
try {
- stateMachineMapLock.lock();
+ lock.lock();
+ stateMachineMapLock.readLock().lock();
if (stateMachineMap.containsKey(groupId)) {
throw new ConsensusGroupAlreadyExistException(groupId);
}
@@ -293,21 +300,26 @@ public class PipeConsensus implements IConsensus {
LOGGER.warn("Cannot create local peer for group {} with peers {}",
groupId, peers, e);
throw new ConsensusException(e);
} finally {
- stateMachineMapLock.unlock();
+ stateMachineMapLock.readLock().unlock();
+ lock.unlock();
}
}
@Override
public void deleteLocalPeer(ConsensusGroupId groupId) throws
ConsensusException {
KillPoint.setKillPoint(IoTConsensusDeleteLocalPeerKillPoints.BEFORE_DELETE);
+ Lock lock =
+ consensusGroupIdReentrantLockMap.computeIfAbsent(groupId, key -> new
ReentrantLock());
try {
- stateMachineMapLock.lock();
+ lock.lock();
+ stateMachineMapLock.readLock().lock();
if (!stateMachineMap.containsKey(groupId)) {
throw new ConsensusGroupNotExistException(groupId);
}
final PipeConsensusServerImpl consensus = stateMachineMap.get(groupId);
consensus.clear();
+ stateMachineMap.remove(groupId);
FileUtils.deleteFileOrDirectory(new File(getPeerDir(groupId)));
KillPoint.setKillPoint(IoTConsensusDeleteLocalPeerKillPoints.AFTER_DELETE);
@@ -315,7 +327,9 @@ public class PipeConsensus implements IConsensus {
LOGGER.warn("Cannot delete local peer for group {}", groupId, e);
throw new ConsensusException(e);
} finally {
- stateMachineMapLock.unlock();
+ stateMachineMapLock.readLock().unlock();
+ lock.unlock();
+ consensusGroupIdReentrantLockMap.remove(groupId);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java
index a6809471f4a..6581a77735d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java
@@ -179,11 +179,11 @@ public class DataNodeRegionManager {
if (regionId instanceof DataRegionId) {
DataRegionId dataRegionId = (DataRegionId) regionId;
storageEngine.createDataRegion(dataRegionId, storageGroup);
- dataRegionLockMap.put(dataRegionId, new ReentrantReadWriteLock(false));
+ dataRegionLockMap.putIfAbsent(dataRegionId, new
ReentrantReadWriteLock(false));
} else {
SchemaRegionId schemaRegionId = (SchemaRegionId) regionId;
schemaEngine.createSchemaRegion(new PartialPath(storageGroup),
schemaRegionId);
- schemaRegionLockMap.put(schemaRegionId, new
ReentrantReadWriteLock(false));
+ schemaRegionLockMap.putIfAbsent(schemaRegionId, new
ReentrantReadWriteLock(false));
}
} catch (Exception e) {
LOGGER.error("create new region {} error", regionId, e);