This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c310cb0c3eb IoTV2: Fix consensus pipe operation time out (#14399)
c310cb0c3eb is described below

commit c310cb0c3eb61cbc53168a3150d569c0ac5ee7a5
Author: Peng Junzhi <[email protected]>
AuthorDate: Fri Dec 13 12:40:53 2024 +0800

    IoTV2: Fix consensus pipe operation time out (#14399)
    
    * fix consensus pipe operation time out
    
    * add remove
---
 .../client/sync/SyncDataNodeClientPool.java        |  2 +-
 .../apache/iotdb/consensus/pipe/PipeConsensus.java | 28 ++++++++++++++++------
 .../thrift/impl/DataNodeRegionManager.java         |  4 ++--
 3 files changed, 24 insertions(+), 10 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
index 2999bbdec58..9a063900c0e 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
@@ -59,7 +59,7 @@ public class SyncDataNodeClientPool {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SyncDataNodeClientPool.class);
 
-  private static final int DEFAULT_RETRY_NUM = 6;
+  private static final int DEFAULT_RETRY_NUM = 10;
 
   private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> 
clientManager;
 
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
index e7a02236f7d..ce5acf73cc3 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
@@ -74,7 +74,9 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
 import static 
org.apache.iotdb.consensus.iot.IoTConsensus.getConsensusGroupIdsFromDir;
@@ -92,7 +94,9 @@ public class PipeConsensus implements IConsensus {
       new ConcurrentHashMap<>();
   private final PipeConsensusRPCService rpcService;
   private final RegisterManager registerManager = new RegisterManager();
-  private final ReentrantLock stateMachineMapLock = new ReentrantLock();
+  private final Map<ConsensusGroupId, ReentrantLock> 
consensusGroupIdReentrantLockMap =
+      new ConcurrentHashMap<>();
+  private final ReentrantReadWriteLock stateMachineMapLock = new 
ReentrantReadWriteLock();
   private final PipeConsensusConfig config;
   private final ConsensusPipeManager consensusPipeManager;
   private final ConsensusPipeGuardian consensusPipeGuardian;
@@ -190,7 +194,7 @@ public class PipeConsensus implements IConsensus {
                     entry -> entry.getKey().getConsensusGroupId(),
                     Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
     try {
-      stateMachineMapLock.lock();
+      stateMachineMapLock.writeLock().lock();
       stateMachineMap.forEach(
           (key, value) ->
               value.checkConsensusPipe(existedPipes.getOrDefault(key, 
ImmutableMap.of())));
@@ -214,7 +218,7 @@ public class PipeConsensus implements IConsensus {
                 }
               });
     } finally {
-      stateMachineMapLock.unlock();
+      stateMachineMapLock.writeLock().unlock();
     }
   }
 
@@ -263,8 +267,11 @@ public class PipeConsensus implements IConsensus {
       throw new IllegalPeerEndpointException(thisNode, peers);
     }
 
+    Lock lock =
+        consensusGroupIdReentrantLockMap.computeIfAbsent(groupId, key -> new 
ReentrantLock());
     try {
-      stateMachineMapLock.lock();
+      lock.lock();
+      stateMachineMapLock.readLock().lock();
       if (stateMachineMap.containsKey(groupId)) {
         throw new ConsensusGroupAlreadyExistException(groupId);
       }
@@ -293,21 +300,26 @@ public class PipeConsensus implements IConsensus {
       LOGGER.warn("Cannot create local peer for group {} with peers {}", 
groupId, peers, e);
       throw new ConsensusException(e);
     } finally {
-      stateMachineMapLock.unlock();
+      stateMachineMapLock.readLock().unlock();
+      lock.unlock();
     }
   }
 
   @Override
   public void deleteLocalPeer(ConsensusGroupId groupId) throws 
ConsensusException {
     
KillPoint.setKillPoint(IoTConsensusDeleteLocalPeerKillPoints.BEFORE_DELETE);
+    Lock lock =
+        consensusGroupIdReentrantLockMap.computeIfAbsent(groupId, key -> new 
ReentrantLock());
     try {
-      stateMachineMapLock.lock();
+      lock.lock();
+      stateMachineMapLock.readLock().lock();
       if (!stateMachineMap.containsKey(groupId)) {
         throw new ConsensusGroupNotExistException(groupId);
       }
 
       final PipeConsensusServerImpl consensus = stateMachineMap.get(groupId);
       consensus.clear();
+      stateMachineMap.remove(groupId);
 
       FileUtils.deleteFileOrDirectory(new File(getPeerDir(groupId)));
       
KillPoint.setKillPoint(IoTConsensusDeleteLocalPeerKillPoints.AFTER_DELETE);
@@ -315,7 +327,9 @@ public class PipeConsensus implements IConsensus {
       LOGGER.warn("Cannot delete local peer for group {}", groupId, e);
       throw new ConsensusException(e);
     } finally {
-      stateMachineMapLock.unlock();
+      stateMachineMapLock.readLock().unlock();
+      lock.unlock();
+      consensusGroupIdReentrantLockMap.remove(groupId);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java
index a6809471f4a..6581a77735d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java
@@ -179,11 +179,11 @@ public class DataNodeRegionManager {
       if (regionId instanceof DataRegionId) {
         DataRegionId dataRegionId = (DataRegionId) regionId;
         storageEngine.createDataRegion(dataRegionId, storageGroup);
-        dataRegionLockMap.put(dataRegionId, new ReentrantReadWriteLock(false));
+        dataRegionLockMap.putIfAbsent(dataRegionId, new 
ReentrantReadWriteLock(false));
       } else {
         SchemaRegionId schemaRegionId = (SchemaRegionId) regionId;
         schemaEngine.createSchemaRegion(new PartialPath(storageGroup), 
schemaRegionId);
-        schemaRegionLockMap.put(schemaRegionId, new 
ReentrantReadWriteLock(false));
+        schemaRegionLockMap.putIfAbsent(schemaRegionId, new 
ReentrantReadWriteLock(false));
       }
     } catch (Exception e) {
       LOGGER.error("create new region {} error", regionId, e);

Reply via email to