This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e08e68a0d4   IoTConsensusV2 X [Region Migration] Each component uses 
an independent ClientManager && Filter out the consensus pipe when judging 
resource release #14052
7e08e68a0d4 is described below

commit 7e08e68a0d4931b4d57f7acd7ee23d820106aaf4
Author: Peng Junzhi <[email protected]>
AuthorDate: Mon Nov 11 23:52:32 2024 +0800

      IoTConsensusV2 X [Region Migration] Each component uses an independent 
ClientManager && Filter out the consensus pipe when judging resource release 
#14052
---
 .../apache/iotdb/consensus/pipe/PipeConsensus.java | 13 ++++++++++---
 .../consensus/pipe/PipeConsensusServerImpl.java    |  1 +
 .../pipeconsensus/PipeConsensusAsyncConnector.java |  2 +-
 .../pipeconsensus/PipeConsensusSyncConnector.java  |  2 +-
 .../container/PipeConsensusClientMgrContainer.java | 22 ++++++++--------------
 .../commons/pipe/agent/task/PipeTaskManager.java   |  7 ++++---
 6 files changed, 25 insertions(+), 22 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
index 4c31182b3d9..8a10a3df70a 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
@@ -111,8 +111,8 @@ public class PipeConsensus implements IConsensus {
             config.getPipeConsensusConfig().getReplicateMode());
     this.consensusPipeGuardian =
         config.getPipeConsensusConfig().getPipe().getConsensusPipeGuardian();
-    this.asyncClientManager = 
PipeConsensusClientMgrContainer.getInstance().getAsyncClientManager();
-    this.syncClientManager = 
PipeConsensusClientMgrContainer.getInstance().getSyncClientManager();
+    this.asyncClientManager = 
PipeConsensusClientMgrContainer.getInstance().newAsyncClientManager();
+    this.syncClientManager = 
PipeConsensusClientMgrContainer.getInstance().newSyncClientManager();
   }
 
   @Override
@@ -373,16 +373,23 @@ public class PipeConsensus implements IConsensus {
 
     try {
       // let target peer reject new write
+      LOGGER.info("[{}] inactivate peer {}", CLASS_NAME, peer);
       impl.setRemotePeerActive(peer, false);
       
KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.AFTER_INACTIVE_PEER);
+
       // wait its consensus pipes to complete
+      LOGGER.info("[{}] wait target peer{} complete transfer...", CLASS_NAME, 
peer);
       impl.waitTargetPeerToPeersTransmissionCompleted(peer);
+
       // remove consensus pipes between target peer and other peers
+      LOGGER.info("[{}] notify other peers to drop consensus pipes...", 
CLASS_NAME);
       impl.notifyPeersToDropConsensusPipe(peer);
+
       // wait target peer to release all resource
+      LOGGER.info("[{}] wait {} to release all resource...", CLASS_NAME, peer);
       impl.waitReleaseAllRegionRelatedResource(peer);
     } catch (ConsensusGroupModifyPeerException e) {
-      throw new ConsensusException(e.getMessage());
+      throw new ConsensusException(e);
     }
     KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.FINISH);
   }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
index 68b50c1c305..aec583e84cc 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
@@ -530,6 +530,7 @@ public class PipeConsensusServerImpl {
 
         final List<String> consensusPipeNames =
             peerManager.getPeers().stream()
+                .filter(peer -> !peer.equals(targetPeer))
                 .map(peer -> new ConsensusPipeName(targetPeer, 
peer).toString())
                 .collect(Collectors.toList());
         isTransmissionCompleted =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
index 2ba68560126..4f56d1a847b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
@@ -142,7 +142,7 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector implements Conse
             nodeUrls, consensusGroupId, thisDataNodeId, 
pipeConsensusConnectorMetrics);
     retryConnector.customize(parameters, configuration);
     asyncTransferClientManager =
-        PipeConsensusClientMgrContainer.getInstance().getAsyncClientManager();
+        PipeConsensusClientMgrContainer.getInstance().newAsyncClientManager();
 
     if (isTabletBatchModeEnabled) {
       tabletBatchBuilder =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
index 96602638010..01b406674c9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
@@ -93,7 +93,7 @@ public class PipeConsensusSyncConnector extends 
IoTDBConnector {
     this.consensusGroupId = consensusGroupId;
     this.thisDataNodeId = thisDataNodeId;
     this.syncRetryClientManager =
-        PipeConsensusClientMgrContainer.getInstance().getSyncClientManager();
+        PipeConsensusClientMgrContainer.getInstance().newSyncClientManager();
     this.pipeConsensusConnectorMetrics = pipeConsensusConnectorMetrics;
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/container/PipeConsensusClientMgrContainer.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/container/PipeConsensusClientMgrContainer.java
index 6fc04163433..383b995104b 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/container/PipeConsensusClientMgrContainer.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/container/PipeConsensusClientMgrContainer.java
@@ -39,32 +39,26 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
  */
 public class PipeConsensusClientMgrContainer {
   private static final CommonConfig CONF = 
CommonDescriptor.getInstance().getConfig();
-  private final IClientManager<TEndPoint, AsyncPipeConsensusServiceClient> 
asyncClientManager;
-  private final IClientManager<TEndPoint, SyncPipeConsensusServiceClient> 
syncClientManager;
+  private final PipeConsensusClientProperty config;
 
   private PipeConsensusClientMgrContainer() {
     // load rpc client config
-    PipeConsensusClientProperty config =
+    this.config =
         PipeConsensusClientProperty.newBuilder()
             
.setIsRpcThriftCompressionEnabled(CONF.isRpcThriftCompressionEnabled())
             .setMaxClientNumForEachNode(CONF.getMaxClientNumForEachNode())
             
.setSelectorNumOfClientManager(CONF.getSelectorNumOfClientManager())
             .build();
-
-    this.asyncClientManager =
-        new IClientManager.Factory<TEndPoint, 
AsyncPipeConsensusServiceClient>()
-            .createClientManager(new 
AsyncPipeConsensusServiceClientPoolFactory(config));
-    this.syncClientManager =
-        new IClientManager.Factory<TEndPoint, SyncPipeConsensusServiceClient>()
-            .createClientManager(new 
SyncPipeConsensusServiceClientPoolFactory(config));
   }
 
-  public IClientManager<TEndPoint, AsyncPipeConsensusServiceClient> 
getAsyncClientManager() {
-    return asyncClientManager;
+  public IClientManager<TEndPoint, AsyncPipeConsensusServiceClient> 
newAsyncClientManager() {
+    return new IClientManager.Factory<TEndPoint, 
AsyncPipeConsensusServiceClient>()
+        .createClientManager(new 
AsyncPipeConsensusServiceClientPoolFactory(config));
   }
 
-  public IClientManager<TEndPoint, SyncPipeConsensusServiceClient> 
getSyncClientManager() {
-    return syncClientManager;
+  public IClientManager<TEndPoint, SyncPipeConsensusServiceClient> 
newSyncClientManager() {
+    return new IClientManager.Factory<TEndPoint, 
SyncPipeConsensusServiceClient>()
+        .createClientManager(new 
SyncPipeConsensusServiceClientPoolFactory(config));
   }
 
   private static class PipeConsensusClientMgrContainerHolder {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskManager.java
index 45c993f05da..70e959e66f0 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskManager.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.commons.pipe.agent.task;
 
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeType;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -115,9 +116,9 @@ public class PipeTaskManager {
    *     otherwise
    */
   public synchronized boolean hasPipeTaskInConsensusGroup(final int 
consensusGroupId) {
-    return pipeMap.values().stream()
-        .anyMatch(
-            consensusGroupId2PipeTask -> 
consensusGroupId2PipeTask.containsKey(consensusGroupId));
+    return pipeMap.entrySet().stream()
+        .filter(entry -> entry.getKey().getPipeType() != PipeType.CONSENSUS)
+        .anyMatch(entry -> entry.getValue().containsKey(consensusGroupId));
   }
 
   /**

Reply via email to