This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 35ef0c8ba42 Fix deadlock between DataNode createDataRegion and 
ConfigNode PipeTaskCoordinatorLock by delegating consensus pipe lifecycle to 
ConfigNode (#17233)
35ef0c8ba42 is described below

commit 35ef0c8ba4221d5fcba5f658e54f617f40001e28
Author: Peng Junzhi <[email protected]>
AuthorDate: Wed Mar 4 14:19:18 2026 +0800

    Fix deadlock between DataNode createDataRegion and ConfigNode 
PipeTaskCoordinatorLock by delegating consensus pipe lifecycle to ConfigNode 
(#17233)
    
    * Fix deadlock between DataNode createDataRegion and ConfigNode 
PipeTaskCoordinatorLock by delegating consensus pipe lifecycle to ConfigNode
    
    * CN get IoTV2 replication mode
    
    * remove duplicated code
    
    * spotless
    
    * fix not create consensus pipe when initially create data region
    
    * Async create pipe to avoid audit log dead lock
    
    * fix PipeTaskCoordinator can not be shared cross thread
    
    * fix DN CN restart at the same time with data race and DN where region was 
removed failed to clear region file
    
    * refine code
    
    * remove useless comment
    
    * unlock for PipeTaskCoordinator no longer return
---
 .../iotdb/confignode/conf/ConfigNodeConfig.java    |  11 +
 .../confignode/conf/ConfigNodeDescriptor.java      |   3 +
 .../iotdb/confignode/manager/ProcedureManager.java |  32 +++
 .../pipe/coordinator/task/PipeTaskCoordinator.java |  14 +-
 .../coordinator/task/PipeTaskCoordinatorLock.java  |  19 +-
 .../subscription/SubscriptionCoordinator.java      |  10 +-
 .../procedure/env/RegionMaintainHandler.java       | 245 +++++++++++++++++++--
 .../impl/region/AddRegionPeerProcedure.java        |   5 +
 .../impl/region/CreateRegionGroupsProcedure.java   |   6 +
 .../impl/region/RemoveRegionPeerProcedure.java     |  45 +++-
 .../procedure/state/AddRegionPeerState.java        |   1 +
 .../procedure/state/CreateRegionGroupsState.java   |   5 +-
 .../procedure/state/RemoveRegionPeerState.java     |   1 +
 .../consensus/config/PipeConsensusConfig.java      |  16 --
 .../apache/iotdb/consensus/pipe/PipeConsensus.java |  39 ++--
 .../consensus/pipe/PipeConsensusServerImpl.java    | 223 +++----------------
 .../consensuspipe/ConsensusPipeDispatcher.java     |  42 ----
 .../pipe/consensuspipe/ConsensusPipeManager.java   | 157 -------------
 .../service/PipeConsensusRPCServiceProcessor.java  |   3 +-
 .../db/consensus/DataRegionConsensusImpl.java      |   2 -
 .../consensus/ConsensusPipeDataNodeDispatcher.java | 131 -----------
 .../java/org/apache/iotdb/db/service/DataNode.java |  61 ++---
 22 files changed, 426 insertions(+), 645 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 88e8d76001d..3abb322d084 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -77,6 +77,9 @@ public class ConfigNodeConfig {
   /** Data region consensus protocol. */
   private String dataRegionConsensusProtocolClass = 
ConsensusFactory.IOT_CONSENSUS;
 
+  /** IoTConsensusV2 replicate mode: "batch" or "stream". */
+  private String iotConsensusV2Mode = "batch";
+
   /** Default number of DataRegion replicas. */
   private int dataReplicationFactor = 1;
 
@@ -530,6 +533,14 @@ public class ConfigNodeConfig {
     this.dataRegionConsensusProtocolClass = dataRegionConsensusProtocolClass;
   }
 
+  public String getIotConsensusV2Mode() {
+    return iotConsensusV2Mode;
+  }
+
+  public void setIotConsensusV2Mode(String iotConsensusV2Mode) {
+    this.iotConsensusV2Mode = iotConsensusV2Mode;
+  }
+
   public int getDataRegionPerDataNode() {
     return dataRegionPerDataNode;
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 0ea7a278732..77790dae1a9 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -205,6 +205,9 @@ public class ConfigNodeDescriptor {
         properties.getProperty(
             "data_region_consensus_protocol_class", 
conf.getDataRegionConsensusProtocolClass()));
 
+    conf.setIotConsensusV2Mode(
+        properties.getProperty("iot_consensus_v2_mode", 
conf.getIotConsensusV2Mode()));
+
     conf.setDataReplicationFactor(
         Integer.parseInt(
             properties.getProperty(
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 0fe3abc79a7..646aaf66daf 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -1485,6 +1485,23 @@ public class ProcedureManager {
     }
   }
 
+  /**
+   * Submit a consensus pipe creation procedure without blocking. The 
procedure will execute in the
+   * background. Failures are logged and can be repaired by the consensus pipe 
guardian.
+   */
+  public void createConsensusPipeAsync(TCreatePipeReq req) {
+    try {
+      CreatePipeProcedureV2 procedure = new CreatePipeProcedureV2(req);
+      executor.submitProcedure(procedure);
+      LOGGER.info("Submitted async consensus pipe creation: {}", 
req.getPipeName());
+    } catch (Exception e) {
+      LOGGER.warn(
+          "Failed to submit async consensus pipe creation for {}: {}",
+          req.getPipeName(),
+          e.getMessage());
+    }
+  }
+
   public TSStatus createPipe(TCreatePipeReq req) {
     try {
       CreatePipeProcedureV2 procedure = new CreatePipeProcedureV2(req);
@@ -1579,6 +1596,21 @@ public class ProcedureManager {
     }
   }
 
+  /**
+   * Submit a consensus pipe drop procedure without blocking. The procedure 
will execute in the
+   * background. Failures are logged and can be repaired by the consensus pipe 
guardian.
+   */
+  public void dropConsensusPipeAsync(String pipeName) {
+    try {
+      DropPipeProcedureV2 procedure = new DropPipeProcedureV2(pipeName);
+      executor.submitProcedure(procedure);
+      LOGGER.info("Submitted async consensus pipe drop: {}", pipeName);
+    } catch (Exception e) {
+      LOGGER.warn(
+          "Failed to submit async consensus pipe drop for {}: {}", pipeName, 
e.getMessage());
+    }
+  }
+
   public TSStatus dropPipe(String pipeName) {
     try {
       DropPipeProcedureV2 procedure = new DropPipeProcedureV2(pipeName);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
index c0b81c2e43c..ea9c61cf45e 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
@@ -85,19 +85,9 @@ public class PipeTaskCoordinator {
   /**
    * Unlock the pipe task coordinator. Calling this method will clear the pipe 
task info holder,
    * which means that the holder will be null after calling this method.
-   *
-   * @return {@code true} if successfully unlocked, {@code false} if current 
thread is not holding
-   *     the lock.
    */
-  public boolean unlock() {
-    try {
-      pipeTaskCoordinatorLock.unlock();
-      return true;
-    } catch (IllegalMonitorStateException ignored) {
-      // This is thrown if unlock() is called without lock() called first.
-      LOGGER.warn("This thread is not holding the lock.");
-      return false;
-    }
+  public void unlock() {
+    pipeTaskCoordinatorLock.unlock();
   }
 
   public boolean isLocked() {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
index 12b92619004..b86c556f20d 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
@@ -22,24 +22,29 @@ package 
org.apache.iotdb.confignode.manager.pipe.coordinator.task;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
 
 /**
- * {@link PipeTaskCoordinatorLock} is a cross thread lock for pipe task 
coordinator. It is used to
+ * {@link PipeTaskCoordinatorLock} is a cross-thread lock for pipe task 
coordinator. It is used to
  * ensure that only one thread can execute the pipe task coordinator at the 
same time.
+ *
+ * <p>Uses {@link Semaphore} instead of {@link 
java.util.concurrent.locks.ReentrantLock} to support
+ * cross-thread acquire/release, which is required by the procedure recovery 
mechanism: locks may be
+ * acquired on the StateMachineUpdater thread during {@code restoreLock()} and 
released on a
+ * ProcedureCoreWorker thread after execution.
  */
 public class PipeTaskCoordinatorLock {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTaskCoordinatorLock.class);
 
-  private final ReentrantLock lock = new ReentrantLock();
+  private final Semaphore semaphore = new Semaphore(1);
 
   public void lock() {
     LOGGER.debug(
         "PipeTaskCoordinator lock waiting for thread {}", 
Thread.currentThread().getName());
     try {
-      lock.lockInterruptibly();
+      semaphore.acquire();
       LOGGER.debug(
           "PipeTaskCoordinator lock acquired by thread {}", 
Thread.currentThread().getName());
     } catch (final InterruptedException e) {
@@ -54,7 +59,7 @@ public class PipeTaskCoordinatorLock {
     try {
       LOGGER.debug(
           "PipeTaskCoordinator lock waiting for thread {}", 
Thread.currentThread().getName());
-      if (lock.tryLock(10, TimeUnit.SECONDS)) {
+      if (semaphore.tryAcquire(10, TimeUnit.SECONDS)) {
         LOGGER.debug(
             "PipeTaskCoordinator lock acquired by thread {}", 
Thread.currentThread().getName());
         return true;
@@ -74,12 +79,12 @@ public class PipeTaskCoordinatorLock {
   }
 
   public void unlock() {
-    lock.unlock();
+    semaphore.release();
     LOGGER.debug(
         "PipeTaskCoordinator lock released by thread {}", 
Thread.currentThread().getName());
   }
 
   public boolean isLocked() {
-    return lock.isLocked();
+    return semaphore.availablePermits() == 0;
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
index b52f958d30a..038167ae58c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
@@ -105,14 +105,8 @@ public class SubscriptionCoordinator {
       subscriptionInfoHolder = null;
     }
 
-    try {
-      coordinatorLock.unlock();
-      return true;
-    } catch (IllegalMonitorStateException ignored) {
-      // This is thrown if unlock() is called without lock() called first.
-      LOGGER.warn("This thread is not holding the lock.");
-      return false;
-    }
+    coordinatorLock.unlock();
+    return true;
   }
 
   public boolean isLocked() {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
index 928f3cfcd37..f827adea5d4 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
@@ -32,6 +32,9 @@ import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
 import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
 import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
 import 
org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager;
@@ -42,9 +45,12 @@ import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import 
org.apache.iotdb.confignode.consensus.request.write.partition.AddRegionLocationPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.partition.RemoveRegionLocationPlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
 import org.apache.iotdb.confignode.manager.ConfigManager;
 import 
org.apache.iotdb.confignode.manager.load.cache.consensus.ConsensusGroupHeartbeatSample;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeName;
 import org.apache.iotdb.mpp.rpc.thrift.TCreatePeerReq;
 import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
 import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeResp;
@@ -64,6 +70,23 @@ import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_CONSENSUS_GROUP_ID_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_CONSENSUS_PIPE_NAME;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_IP_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_PORT_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_CAPTURE_TABLE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_CAPTURE_TREE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_CONSENSUS_GROUP_ID_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_CONSENSUS_RECEIVER_DATANODE_ID_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_CONSENSUS_SENDER_DATANODE_ID_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_INCLUSION_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_IOTDB_USER_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_REALTIME_MODE_KEY;
 import static 
org.apache.iotdb.confignode.conf.ConfigNodeConstant.REGION_MIGRATE_PROCESS;
 import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS;
 import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS_V2;
@@ -275,22 +298,15 @@ public class RegionMaintainHandler {
     TMaintainPeerReq maintainPeerReq =
         new TMaintainPeerReq(regionId, originalDataNode, procedureId);
 
+    // Always use full retries regardless of node status, because after a 
cluster crash the
+    // target DataNode may be Unknown but still in the process of restarting.
     status =
-        
configManager.getLoadManager().getNodeStatus(originalDataNode.getDataNodeId())
-                == NodeStatus.Unknown
-            ? (TSStatus)
-                SyncDataNodeClientPool.getInstance()
-                    .sendSyncRequestToDataNodeWithGivenRetry(
-                        originalDataNode.getInternalEndPoint(),
-                        maintainPeerReq,
-                        CnToDnSyncRequestType.DELETE_OLD_REGION_PEER,
-                        1)
-            : (TSStatus)
-                SyncDataNodeClientPool.getInstance()
-                    .sendSyncRequestToDataNodeWithRetry(
-                        originalDataNode.getInternalEndPoint(),
-                        maintainPeerReq,
-                        CnToDnSyncRequestType.DELETE_OLD_REGION_PEER);
+        (TSStatus)
+            SyncDataNodeClientPool.getInstance()
+                .sendSyncRequestToDataNodeWithRetry(
+                    originalDataNode.getInternalEndPoint(),
+                    maintainPeerReq,
+                    CnToDnSyncRequestType.DELETE_OLD_REGION_PEER);
     LOGGER.info(
         "{}, Send action deleteOldRegionPeer finished, regionId: {}, 
dataNodeId: {}",
         REGION_MIGRATE_PROCESS,
@@ -392,6 +408,205 @@ public class RegionMaintainHandler {
     
configManager.getLoadManager().getRouteBalancer().balanceRegionLeaderAndPriority();
   }
 
+  /**
+   * Create bidirectional consensus pipes between the target DataNode and all 
existing peers. Only
+   * applies to IoTConsensusV2 DataRegions. Called by AddRegionPeerProcedure 
before
+   * DO_ADD_REGION_PEER so that pipes exist before the coordinator starts data 
transfer.
+   */
+  public void createConsensusPipesForAddPeer(
+      TConsensusGroupId regionId, TDataNodeLocation targetDataNode) {
+    if (!isIoTConsensusV2DataRegion(regionId)) {
+      return;
+    }
+
+    List<TDataNodeLocation> existingLocations = findRegionLocations(regionId);
+    for (TDataNodeLocation existingLocation : existingLocations) {
+      if (existingLocation.getDataNodeId() == targetDataNode.getDataNodeId()) {
+        continue;
+      }
+      // Pipe: existingPeer → targetPeer
+      createSingleConsensusPipe(
+          regionId,
+          existingLocation.getDataNodeId(),
+          existingLocation.getDataRegionConsensusEndPoint(),
+          targetDataNode.getDataNodeId(),
+          targetDataNode.getDataRegionConsensusEndPoint());
+      // Pipe: targetPeer → existingPeer
+      createSingleConsensusPipe(
+          regionId,
+          targetDataNode.getDataNodeId(),
+          targetDataNode.getDataRegionConsensusEndPoint(),
+          existingLocation.getDataNodeId(),
+          existingLocation.getDataRegionConsensusEndPoint());
+    }
+  }
+
+  /**
+   * Create bidirectional consensus pipes among all peers for newly created 
RegionGroups. Only
+   * applies to IoTConsensusV2 DataRegions. Called by 
CreateRegionGroupsProcedure after all regions
+   * are activated.
+   */
+  public void createInitialConsensusPipes(CreateRegionGroupsPlan persistPlan) {
+    if (!IOT_CONSENSUS_V2.equals(CONF.getDataRegionConsensusProtocolClass())) {
+      return;
+    }
+
+    persistPlan
+        .getRegionGroupMap()
+        .forEach(
+            (database, regionReplicaSets) ->
+                regionReplicaSets.forEach(
+                    regionReplicaSet -> {
+                      TConsensusGroupId regionId = 
regionReplicaSet.getRegionId();
+                      if 
(!TConsensusGroupType.DataRegion.equals(regionId.getType())) {
+                        return;
+                      }
+                      List<TDataNodeLocation> locations = 
regionReplicaSet.getDataNodeLocations();
+                      for (int i = 0; i < locations.size(); i++) {
+                        for (int j = 0; j < locations.size(); j++) {
+                          if (i == j) {
+                            continue;
+                          }
+                          createSingleConsensusPipeAsync(
+                              regionId,
+                              locations.get(i).getDataNodeId(),
+                              
locations.get(i).getDataRegionConsensusEndPoint(),
+                              locations.get(j).getDataNodeId(),
+                              
locations.get(j).getDataRegionConsensusEndPoint());
+                        }
+                      }
+                    }));
+  }
+
+  /**
+   * Drop consensus pipes related to the target DataNode for a region. Only 
applies to
+   * IoTConsensusV2 DataRegions. Called by RemoveRegionPeerProcedure after 
DELETE_OLD_REGION_PEER.
+   */
+  public void dropConsensusPipesForRemovePeer(
+      TConsensusGroupId regionId, TDataNodeLocation targetDataNode) {
+    if (!isIoTConsensusV2DataRegion(regionId)) {
+      return;
+    }
+
+    DataRegionId dataRegionId = new DataRegionId(regionId.getId());
+    List<TDataNodeLocation> existingLocations = findRegionLocations(regionId);
+    for (TDataNodeLocation existingLocation : existingLocations) {
+      if (existingLocation.getDataNodeId() == targetDataNode.getDataNodeId()) {
+        continue;
+      }
+      // Drop pipe: existingPeer → targetPeer
+      String pipeName1 =
+          new ConsensusPipeName(
+                  dataRegionId, existingLocation.getDataNodeId(), 
targetDataNode.getDataNodeId())
+              .toString();
+      configManager.getProcedureManager().dropConsensusPipeAsync(pipeName1);
+      // Drop pipe: targetPeer → existingPeer
+      String pipeName2 =
+          new ConsensusPipeName(
+                  dataRegionId, targetDataNode.getDataNodeId(), 
existingLocation.getDataNodeId())
+              .toString();
+      configManager.getProcedureManager().dropConsensusPipeAsync(pipeName2);
+    }
+  }
+
+  private boolean isIoTConsensusV2DataRegion(TConsensusGroupId regionId) {
+    return TConsensusGroupType.DataRegion.equals(regionId.getType())
+        && IOT_CONSENSUS_V2.equals(CONF.getDataRegionConsensusProtocolClass());
+  }
+
+  private TCreatePipeReq buildConsensusPipeReq(
+      TConsensusGroupId regionId,
+      int senderNodeId,
+      TEndPoint senderEndpoint,
+      int receiverNodeId,
+      TEndPoint receiverEndpoint) {
+    DataRegionId dataRegionId = new DataRegionId(regionId.getId());
+    ConsensusPipeName pipeName = new ConsensusPipeName(dataRegionId, 
senderNodeId, receiverNodeId);
+
+    String replicateMode = CONF.getIotConsensusV2Mode();
+
+    Map<String, String> extractorAttributes = new HashMap<>();
+    extractorAttributes.put(EXTRACTOR_KEY, 
BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName());
+    extractorAttributes.put(EXTRACTOR_INCLUSION_KEY, "data");
+    extractorAttributes.put(EXTRACTOR_CONSENSUS_GROUP_ID_KEY, 
dataRegionId.toString());
+    extractorAttributes.put(
+        EXTRACTOR_CONSENSUS_SENDER_DATANODE_ID_KEY, 
String.valueOf(senderNodeId));
+    extractorAttributes.put(
+        EXTRACTOR_CONSENSUS_RECEIVER_DATANODE_ID_KEY, 
String.valueOf(receiverNodeId));
+    extractorAttributes.put(EXTRACTOR_REALTIME_MODE_KEY, replicateMode);
+    extractorAttributes.put(EXTRACTOR_CAPTURE_TABLE_KEY, String.valueOf(true));
+    extractorAttributes.put(EXTRACTOR_CAPTURE_TREE_KEY, String.valueOf(true));
+    extractorAttributes.put(
+        EXTRACTOR_IOTDB_USER_KEY, 
CommonDescriptor.getInstance().getConfig().getDefaultAdminName());
+
+    Map<String, String> processorAttributes = new HashMap<>();
+    processorAttributes.put(
+        PROCESSOR_KEY, 
BuiltinPipePlugin.PIPE_CONSENSUS_PROCESSOR.getPipePluginName());
+
+    Map<String, String> connectorAttributes = new HashMap<>();
+    connectorAttributes.put(
+        CONNECTOR_KEY, 
BuiltinPipePlugin.PIPE_CONSENSUS_ASYNC_CONNECTOR.getPipePluginName());
+    connectorAttributes.put(CONNECTOR_CONSENSUS_GROUP_ID_KEY, 
String.valueOf(dataRegionId.getId()));
+    connectorAttributes.put(CONNECTOR_CONSENSUS_PIPE_NAME, 
pipeName.toString());
+    connectorAttributes.put(CONNECTOR_IOTDB_IP_KEY, receiverEndpoint.ip);
+    connectorAttributes.put(CONNECTOR_IOTDB_PORT_KEY, 
String.valueOf(receiverEndpoint.port));
+    connectorAttributes.put(CONNECTOR_IOTDB_PARALLEL_TASKS_KEY, 
String.valueOf(1));
+    connectorAttributes.put(CONNECTOR_REALTIME_FIRST_KEY, 
String.valueOf(false));
+
+    return new TCreatePipeReq()
+        .setPipeName(pipeName.toString())
+        .setNeedManuallyStart(false)
+        .setExtractorAttributes(extractorAttributes)
+        .setProcessorAttributes(processorAttributes)
+        .setConnectorAttributes(connectorAttributes);
+  }
+
+  /**
+   * Create a single consensus pipe synchronously (blocks until procedure 
finishes or times out with
+   * optimistic success). Used by AddRegionPeerProcedure where pipes must 
exist before data sync.
+   */
+  private void createSingleConsensusPipe(
+      TConsensusGroupId regionId,
+      int senderNodeId,
+      TEndPoint senderEndpoint,
+      int receiverNodeId,
+      TEndPoint receiverEndpoint) {
+    TCreatePipeReq req =
+        buildConsensusPipeReq(
+            regionId, senderNodeId, senderEndpoint, receiverNodeId, 
receiverEndpoint);
+    TSStatus status = 
configManager.getProcedureManager().createConsensusPipe(req);
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      LOGGER.warn(
+          "{}, Failed to create consensus pipe {}: {}",
+          REGION_MIGRATE_PROCESS,
+          req.getPipeName(),
+          status);
+    } else {
+      LOGGER.info("{}, Created consensus pipe {}", REGION_MIGRATE_PROCESS, 
req.getPipeName());
+    }
+  }
+
+  /**
+   * Create a single consensus pipe asynchronously (fire-and-forget). Used by
+   * CreateRegionGroupsProcedure where blocking would cause deadlock and new 
regions have no data to
+   * lose.
+   */
+  private void createSingleConsensusPipeAsync(
+      TConsensusGroupId regionId,
+      int senderNodeId,
+      TEndPoint senderEndpoint,
+      int receiverNodeId,
+      TEndPoint receiverEndpoint) {
+    TCreatePipeReq req =
+        buildConsensusPipeReq(
+            regionId, senderNodeId, senderEndpoint, receiverNodeId, 
receiverEndpoint);
+    configManager.getProcedureManager().createConsensusPipeAsync(req);
+    LOGGER.info(
+        "{}, Submitted async consensus pipe creation: {}",
+        REGION_MIGRATE_PROCESS,
+        req.getPipeName());
+  }
+
   /**
    * Find all DataNodes which contains the given regionId
    *
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
index f1907edf754..d9cd2ad8817 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
@@ -94,6 +94,11 @@ public class AddRegionPeerProcedure extends 
RegionOperationProcedure<AddRegionPe
           if (status.getCode() != SUCCESS_STATUS.getStatusCode()) {
             return warnAndRollBackAndNoMoreState(env, handler, 
"CREATE_NEW_REGION_PEER fail");
           }
+          setNextState(AddRegionPeerState.CREATE_CONSENSUS_PIPES);
+          break;
+        case CREATE_CONSENSUS_PIPES:
+          handler.createConsensusPipesForAddPeer(regionId, targetDataNode);
+          setKillPoint(state);
           setNextState(AddRegionPeerState.DO_ADD_REGION_PEER);
           break;
         case DO_ADD_REGION_PEER:
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java
index 14e4be60ed9..17c8b2abdf4 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java
@@ -225,6 +225,12 @@ public class CreateRegionGroupsProcedure
                           }
                         }));
         env.activateRegionGroup(activateRegionGroupMap);
+        setNextState(CreateRegionGroupsState.CREATE_INITIAL_CONSENSUS_PIPES);
+        break;
+      case CREATE_INITIAL_CONSENSUS_PIPES:
+        if (TConsensusGroupType.DataRegion.equals(consensusGroupType)) {
+          
env.getRegionMaintainHandler().createInitialConsensusPipes(persistPlan);
+        }
         setNextState(CreateRegionGroupsState.CREATE_REGION_GROUPS_FINISH);
         break;
       case CREATE_REGION_GROUPS_FINISH:
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
index f362a7a1008..9b4c29b095e 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
@@ -45,14 +45,18 @@ import java.util.Objects;
 
 import static org.apache.iotdb.commons.utils.KillPoint.KillPoint.setKillPoint;
 import static 
org.apache.iotdb.confignode.procedure.state.RemoveRegionPeerState.DELETE_OLD_REGION_PEER;
+import static 
org.apache.iotdb.confignode.procedure.state.RemoveRegionPeerState.DROP_CONSENSUS_PIPES;
 import static 
org.apache.iotdb.confignode.procedure.state.RemoveRegionPeerState.REMOVE_REGION_LOCATION_CACHE;
 import static 
org.apache.iotdb.confignode.procedure.state.RemoveRegionPeerState.REMOVE_REGION_PEER;
 import static org.apache.iotdb.rpc.TSStatusCode.SUCCESS_STATUS;
 
 public class RemoveRegionPeerProcedure extends 
RegionOperationProcedure<RemoveRegionPeerState> {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(RemoveRegionPeerProcedure.class);
+  private static final int MAX_DELETE_OLD_REGION_PEER_RETRY = 3;
+  private static final long DELETE_OLD_REGION_PEER_RETRY_INTERVAL_MS = 5_000;
   private TDataNodeLocation coordinator;
   private TDataNodeLocation targetDataNode;
+  private transient int deleteOldRegionPeerAttempted = 0;
 
   public RemoveRegionPeerProcedure() {
     super();
@@ -129,23 +133,56 @@ public class RemoveRegionPeerProcedure extends 
RegionOperationProcedure<RemoveRe
               handler.submitDeleteOldRegionPeerTask(this.getProcId(), 
targetDataNode, regionId);
           setKillPoint(state);
           if (tsStatus.getCode() != SUCCESS_STATUS.getStatusCode()) {
+            deleteOldRegionPeerAttempted++;
+            if (deleteOldRegionPeerAttempted <= 
MAX_DELETE_OLD_REGION_PEER_RETRY) {
+              LOGGER.warn(
+                  "[pid{}][RemoveRegion] DELETE_OLD_REGION_PEER task submitted 
failed (attempt {}/{}), will retry after {}ms. {}",
+                  getProcId(),
+                  deleteOldRegionPeerAttempted,
+                  MAX_DELETE_OLD_REGION_PEER_RETRY + 1,
+                  DELETE_OLD_REGION_PEER_RETRY_INTERVAL_MS,
+                  regionId);
+              Thread.sleep(DELETE_OLD_REGION_PEER_RETRY_INTERVAL_MS);
+              setNextState(DELETE_OLD_REGION_PEER);
+              return Flow.HAS_MORE_STATE;
+            }
             LOGGER.warn(
-                "[pid{}][RemoveRegion] DELETE_OLD_REGION_PEER task submitted 
failed, procedure will continue. You should manually delete region file. {}",
+                "[pid{}][RemoveRegion] DELETE_OLD_REGION_PEER task submitted 
failed after {} attempts, procedure will continue. You should manually delete 
region file. {}",
                 getProcId(),
+                deleteOldRegionPeerAttempted + 1,
                 regionId);
-            setNextState(REMOVE_REGION_LOCATION_CACHE);
+            setNextState(DROP_CONSENSUS_PIPES);
             return Flow.HAS_MORE_STATE;
           }
           TRegionMigrateResult deleteOldRegionPeerResult =
               handler.waitTaskFinish(this.getProcId(), targetDataNode);
           if (deleteOldRegionPeerResult.getTaskStatus() != 
TRegionMaintainTaskStatus.SUCCESS) {
+            deleteOldRegionPeerAttempted++;
+            if (deleteOldRegionPeerAttempted <= 
MAX_DELETE_OLD_REGION_PEER_RETRY) {
+              LOGGER.warn(
+                  "[pid{}][RemoveRegion] DELETE_OLD_REGION_PEER executed 
failed (attempt {}/{}), will retry after {}ms. {}",
+                  getProcId(),
+                  deleteOldRegionPeerAttempted,
+                  MAX_DELETE_OLD_REGION_PEER_RETRY + 1,
+                  DELETE_OLD_REGION_PEER_RETRY_INTERVAL_MS,
+                  regionId);
+              Thread.sleep(DELETE_OLD_REGION_PEER_RETRY_INTERVAL_MS);
+              setNextState(DELETE_OLD_REGION_PEER);
+              return Flow.HAS_MORE_STATE;
+            }
             LOGGER.warn(
-                "[pid{}][RemoveRegion] DELETE_OLD_REGION_PEER executed failed, 
procedure will continue. You should manually delete region file. {}",
+                "[pid{}][RemoveRegion] DELETE_OLD_REGION_PEER executed failed 
after {} attempts, procedure will continue. You should manually delete region 
file. {}",
                 getProcId(),
+                deleteOldRegionPeerAttempted + 1,
                 regionId);
-            setNextState(REMOVE_REGION_LOCATION_CACHE);
+            setNextState(DROP_CONSENSUS_PIPES);
             return Flow.HAS_MORE_STATE;
           }
+          setNextState(DROP_CONSENSUS_PIPES);
+          break;
+        case DROP_CONSENSUS_PIPES:
+          handler.dropConsensusPipesForRemovePeer(regionId, targetDataNode);
+          setKillPoint(state);
           setNextState(REMOVE_REGION_LOCATION_CACHE);
           break;
         case REMOVE_REGION_LOCATION_CACHE:
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/AddRegionPeerState.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/AddRegionPeerState.java
index 0e3477626b6..43fb405c221 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/AddRegionPeerState.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/AddRegionPeerState.java
@@ -23,4 +23,5 @@ public enum AddRegionPeerState {
   CREATE_NEW_REGION_PEER,
   DO_ADD_REGION_PEER,
   UPDATE_REGION_LOCATION_CACHE,
+  CREATE_CONSENSUS_PIPES,
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/CreateRegionGroupsState.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/CreateRegionGroupsState.java
index f8921092667..4ff90132f59 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/CreateRegionGroupsState.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/CreateRegionGroupsState.java
@@ -35,5 +35,8 @@ public enum CreateRegionGroupsState {
   // For DataRegionGroups that use iot consensus protocol, select leader by 
the way
   ACTIVATE_REGION_GROUPS,
 
-  CREATE_REGION_GROUPS_FINISH
+  CREATE_REGION_GROUPS_FINISH,
+
+  // Create initial consensus pipes for IoTConsensusV2 DataRegionGroups.
+  CREATE_INITIAL_CONSENSUS_PIPES
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveRegionPeerState.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveRegionPeerState.java
index e9767972799..a9c463cbafe 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveRegionPeerState.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveRegionPeerState.java
@@ -24,4 +24,5 @@ public enum RemoveRegionPeerState {
   REMOVE_REGION_PEER,
   DELETE_OLD_REGION_PEER,
   REMOVE_REGION_LOCATION_CACHE,
+  DROP_CONSENSUS_PIPES,
 }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java
index da06c60a624..c0d7257183d 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.consensus.config;
 
 import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
-import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeDispatcher;
 import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeGuardian;
 import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeReceiver;
 import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeSelector;
@@ -244,7 +243,6 @@ public class PipeConsensusConfig {
     private final String extractorPluginName;
     private final String processorPluginName;
     private final String connectorPluginName;
-    private final ConsensusPipeDispatcher consensusPipeDispatcher;
     private final ConsensusPipeGuardian consensusPipeGuardian;
     private final ConsensusPipeSelector consensusPipeSelector;
     private final ReplicateProgressManager replicateProgressManager;
@@ -255,7 +253,6 @@ public class PipeConsensusConfig {
         String extractorPluginName,
         String processorPluginName,
         String connectorPluginName,
-        ConsensusPipeDispatcher consensusPipeDispatcher,
         ConsensusPipeGuardian consensusPipeGuardian,
         ConsensusPipeSelector consensusPipeSelector,
         ReplicateProgressManager replicateProgressManager,
@@ -264,7 +261,6 @@ public class PipeConsensusConfig {
       this.extractorPluginName = extractorPluginName;
       this.processorPluginName = processorPluginName;
       this.connectorPluginName = connectorPluginName;
-      this.consensusPipeDispatcher = consensusPipeDispatcher;
       this.consensusPipeGuardian = consensusPipeGuardian;
       this.consensusPipeSelector = consensusPipeSelector;
       this.replicateProgressManager = replicateProgressManager;
@@ -284,10 +280,6 @@ public class PipeConsensusConfig {
       return connectorPluginName;
     }
 
-    public ConsensusPipeDispatcher getConsensusPipeDispatcher() {
-      return consensusPipeDispatcher;
-    }
-
     public ConsensusPipeGuardian getConsensusPipeGuardian() {
       return consensusPipeGuardian;
     }
@@ -318,7 +310,6 @@ public class PipeConsensusConfig {
           BuiltinPipePlugin.PIPE_CONSENSUS_PROCESSOR.getPipePluginName();
       private String connectorPluginName =
           BuiltinPipePlugin.PIPE_CONSENSUS_ASYNC_CONNECTOR.getPipePluginName();
-      private ConsensusPipeDispatcher consensusPipeDispatcher = null;
       private ConsensusPipeGuardian consensusPipeGuardian = null;
       private ConsensusPipeSelector consensusPipeSelector = null;
       private ReplicateProgressManager replicateProgressManager = null;
@@ -340,12 +331,6 @@ public class PipeConsensusConfig {
         return this;
       }
 
-      public Pipe.Builder setConsensusPipeDispatcher(
-          ConsensusPipeDispatcher consensusPipeDispatcher) {
-        this.consensusPipeDispatcher = consensusPipeDispatcher;
-        return this;
-      }
-
       public Pipe.Builder setConsensusPipeGuardian(ConsensusPipeGuardian 
consensusPipeGuardian) {
         this.consensusPipeGuardian = consensusPipeGuardian;
         return this;
@@ -378,7 +363,6 @@ public class PipeConsensusConfig {
             extractorPluginName,
             processorPluginName,
             connectorPluginName,
-            consensusPipeDispatcher,
             consensusPipeGuardian,
             consensusPipeSelector,
             replicateProgressManager,
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
index 762f338ad96..33d73d673bf 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
@@ -51,8 +51,8 @@ import 
org.apache.iotdb.consensus.exception.IllegalPeerNumException;
 import 
org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
 import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
 import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeGuardian;
-import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeManager;
 import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeName;
+import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeSelector;
 import org.apache.iotdb.consensus.pipe.service.PipeConsensusRPCService;
 import 
org.apache.iotdb.consensus.pipe.service.PipeConsensusRPCServiceProcessor;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -101,7 +101,7 @@ public class PipeConsensus implements IConsensus {
       new ConcurrentHashMap<>();
   private final ReentrantReadWriteLock stateMachineMapLock = new 
ReentrantReadWriteLock();
   private final PipeConsensusConfig config;
-  private final ConsensusPipeManager consensusPipeManager;
+  private final ConsensusPipeSelector consensusPipeSelector;
   private final ConsensusPipeGuardian consensusPipeGuardian;
   private final IClientManager<TEndPoint, AsyncPipeConsensusServiceClient> 
asyncClientManager;
   private final IClientManager<TEndPoint, SyncPipeConsensusServiceClient> 
syncClientManager;
@@ -114,10 +114,8 @@ public class PipeConsensus implements IConsensus {
     this.config = config.getPipeConsensusConfig();
     this.registry = registry;
     this.rpcService = new PipeConsensusRPCService(thisNode, 
config.getPipeConsensusConfig());
-    this.consensusPipeManager =
-        new ConsensusPipeManager(
-            config.getPipeConsensusConfig().getPipe(),
-            config.getPipeConsensusConfig().getReplicateMode());
+    this.consensusPipeSelector =
+        config.getPipeConsensusConfig().getPipe().getConsensusPipeSelector();
     this.consensusPipeGuardian =
         config.getPipeConsensusConfig().getPipe().getConsensusPipeGuardian();
     this.asyncClientManager =
@@ -177,7 +175,6 @@ public class PipeConsensus implements IConsensus {
                               registry.apply(consensusGroupId),
                               new ArrayList<>(),
                               config,
-                              consensusPipeManager,
                               syncClientManager);
                       stateMachineMap.put(consensusGroupId, consensus);
                       checkPeerListAndStartIfEligible(consensusGroupId, 
consensus);
@@ -220,14 +217,14 @@ public class PipeConsensus implements IConsensus {
         // make peers which are in list correct
         resetPeerListWithoutThrow.accept(
             consensusGroupId, 
correctPeerListBeforeStart.get(consensusGroupId));
-        consensus.start(true);
+        consensus.start();
       } else {
         // clear peers which are not in the list
         resetPeerListWithoutThrow.accept(consensusGroupId, 
Collections.emptyList());
       }
 
     } else {
-      consensus.start(true);
+      consensus.start();
     }
   }
 
@@ -243,7 +240,7 @@ public class PipeConsensus implements IConsensus {
 
   private void checkAllConsensusPipe() {
     final Map<ConsensusGroupId, Map<ConsensusPipeName, PipeStatus>> 
existedPipes =
-        consensusPipeManager.getAllConsensusPipe().entrySet().stream()
+        consensusPipeSelector.getAllConsensusPipe().entrySet().stream()
             .filter(entry -> entry.getKey().getSenderDataNodeId() == 
thisNodeId)
             .collect(
                 Collectors.groupingBy(
@@ -254,25 +251,16 @@ public class PipeConsensus implements IConsensus {
       stateMachineMap.forEach(
           (key, value) ->
               value.checkConsensusPipe(existedPipes.getOrDefault(key, 
ImmutableMap.of())));
+      // Log orphaned pipes (region no longer exists locally); ConfigNode 
handles actual cleanup.
       existedPipes.entrySet().stream()
           .filter(entry -> !stateMachineMap.containsKey(entry.getKey()))
           .flatMap(entry -> entry.getValue().keySet().stream())
           .forEach(
-              consensusPipeName -> {
-                try {
+              consensusPipeName ->
                   LOGGER.warn(
-                      "{} drop consensus pipe [{}]",
+                      "{} orphaned consensus pipe [{}] found, should be 
dropped by ConfigNode",
                       consensusPipeName.getConsensusGroupId(),
-                      consensusPipeName);
-                  consensusPipeManager.updateConsensusPipe(consensusPipeName, 
PipeStatus.DROPPED);
-                } catch (Exception e) {
-                  LOGGER.warn(
-                      "{} cannot drop consensus pipe [{}]",
-                      consensusPipeName.getConsensusGroupId(),
-                      consensusPipeName,
-                      e);
-                }
-              });
+                      consensusPipeName));
     } finally {
       stateMachineMapLock.writeLock().unlock();
     }
@@ -347,10 +335,9 @@ public class PipeConsensus implements IConsensus {
                 registry.apply(groupId),
                 peers,
                 config,
-                consensusPipeManager,
                 syncClientManager);
         stateMachineMap.put(groupId, consensus);
-        consensus.start(false); // pipe will start after creating
+        consensus.start();
         
KillPoint.setKillPoint(DataNodeKillPoints.DESTINATION_CREATE_LOCAL_PEER);
       } catch (IOException e) {
         LOGGER.warn("Cannot create local peer for group {} with peers {}", 
groupId, peers, e);
@@ -511,7 +498,7 @@ public class PipeConsensus implements IConsensus {
     for (Peer peer : correctPeers) {
       if (!impl.containsPeer(peer) && peer.getNodeId() != this.thisNodeId) {
         try {
-          impl.createConsensusPipeToTargetPeer(peer, false);
+          impl.createConsensusPipeToTargetPeer(peer);
           LOGGER.info("[RESET PEER LIST] {} Build sync channel with: {}", 
groupId, peer);
         } catch (ConsensusGroupModifyPeerException e) {
           LOGGER.warn(
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
index 23030b89bd4..7aed02e075f 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
@@ -40,7 +40,6 @@ import 
org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.config.PipeConsensusConfig;
 import org.apache.iotdb.consensus.config.PipeConsensusConfig.ReplicateMode;
 import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException;
-import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeManager;
 import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeName;
 import org.apache.iotdb.consensus.pipe.consensuspipe.ReplicateProgressManager;
 import org.apache.iotdb.consensus.pipe.metric.PipeConsensusServerMetrics;
@@ -63,11 +62,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -79,8 +76,6 @@ public class PipeConsensusServerImpl {
   private static final long 
CHECK_TRANSMISSION_COMPLETION_INTERVAL_IN_MILLISECONDS = 2_000L;
   private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS 
=
       PerformanceOverviewMetrics.getInstance();
-  private static final long RETRY_WAIT_TIME_IN_MS = 500;
-  private static final long MAX_RETRY_TIMES = 20;
   private final Peer thisNode;
   private final IStateMachine stateMachine;
   private final Lock stateMachineLock = new ReentrantLock();
@@ -88,7 +83,6 @@ public class PipeConsensusServerImpl {
   private final AtomicBoolean active;
   private final AtomicBoolean isStarted;
   private final String consensusGroupId;
-  private final ConsensusPipeManager consensusPipeManager;
   private final ReplicateProgressManager replicateProgressManager;
   private final IClientManager<TEndPoint, SyncPipeConsensusServiceClient> 
syncClientManager;
   private final PipeConsensusServerMetrics pipeConsensusServerMetrics;
@@ -101,7 +95,6 @@ public class PipeConsensusServerImpl {
       IStateMachine stateMachine,
       List<Peer> peers,
       PipeConsensusConfig config,
-      ConsensusPipeManager consensusPipeManager,
       IClientManager<TEndPoint, SyncPipeConsensusServiceClient> 
syncClientManager)
       throws IOException {
     this.thisNode = thisNode;
@@ -110,85 +103,31 @@ public class PipeConsensusServerImpl {
     this.active = new AtomicBoolean(true);
     this.isStarted = new AtomicBoolean(false);
     this.consensusGroupId = thisNode.getGroupId().toString();
-    this.consensusPipeManager = consensusPipeManager;
     this.replicateProgressManager = config.getPipe().getProgressIndexManager();
     this.syncClientManager = syncClientManager;
     this.pipeConsensusServerMetrics = new PipeConsensusServerMetrics(this);
     this.replicateMode = config.getReplicateMode();
 
-    // if peers is empty, the `resetPeerList` will automatically fetch correct 
peers' info from CN.
-    if (!peers.isEmpty()) {
-      // create consensus pipes
-      Set<Peer> deepCopyPeersWithoutSelf =
-          peers.stream().filter(peer -> 
!peer.equals(thisNode)).collect(Collectors.toSet());
-      final List<Peer> successfulPipes = 
createConsensusPipes(deepCopyPeersWithoutSelf);
-      if (successfulPipes.size() < deepCopyPeersWithoutSelf.size()) {
-        // roll back
-        updateConsensusPipesStatus(successfulPipes, PipeStatus.DROPPED);
-        throw new IOException(String.format("%s cannot create all consensus 
pipes", thisNode));
-      }
-    }
+    // Consensus pipe creation is fully delegated to ConfigNode to avoid 
deadlocks between
+    // DataNode RPC handlers and ConfigNode's PipeTaskCoordinatorLock. 
ConfigNode proactively
+    // creates consensus pipes at key lifecycle points:
+    //   1. New DataRegion creation: via CreatePipeProcedureV2 in 
CreateRegionGroupsProcedure
+    //   2. Region migration addPeer: via CREATE_CONSENSUS_PIPES state in 
AddRegionPeerProcedure
   }
 
-  @SuppressWarnings("java:S2276")
-  public synchronized void start(boolean startConsensusPipes) throws 
IOException {
+  public synchronized void start() throws IOException {
     stateMachine.start();
     MetricService.getInstance().addMetricSet(this.pipeConsensusServerMetrics);
-
-    if (startConsensusPipes) {
-      // start all consensus pipes
-      final List<Peer> otherPeers = peerManager.getOtherPeers(thisNode);
-      List<Peer> failedPipes =
-          updateConsensusPipesStatus(new ArrayList<>(otherPeers), 
PipeStatus.RUNNING);
-      // considering procedure can easily time out, keep trying 
updateConsensusPipesStatus until all
-      // consensus pipes are started gracefully or exceed the maximum number 
of attempts.
-      // NOTE: start pipe procedure is idempotent guaranteed.
-      try {
-        for (int i = 0; i < MAX_RETRY_TIMES && !failedPipes.isEmpty(); i++) {
-          failedPipes = updateConsensusPipesStatus(failedPipes, 
PipeStatus.RUNNING);
-          Thread.sleep(RETRY_WAIT_TIME_IN_MS);
-        }
-      } catch (InterruptedException e) {
-        LOGGER.warn(
-            "PipeConsensusImpl-peer{}: pipeConsensusImpl thread get 
interrupted when start consensus pipe. May because IoTDB process is killed.",
-            thisNode);
-        throw new IOException(String.format("%s cannot start all consensus 
pipes", thisNode));
-      }
-      // if there still are some consensus pipes failed to start, throw an 
exception.
-      if (!failedPipes.isEmpty()) {
-        // roll back
-        List<Peer> successfulPipes = new ArrayList<>(otherPeers);
-        successfulPipes.removeAll(failedPipes);
-        updateConsensusPipesStatus(successfulPipes, PipeStatus.STOPPED);
-        throw new IOException(String.format("%s cannot start all consensus 
pipes", thisNode));
-      }
-    }
     isStarted.set(true);
   }
 
   public synchronized void stop() {
-    // stop all consensus pipes
-    final List<Peer> otherPeers = peerManager.getOtherPeers(thisNode);
-    final List<Peer> failedPipes =
-        updateConsensusPipesStatus(new ArrayList<>(otherPeers), 
PipeStatus.STOPPED);
-    if (!failedPipes.isEmpty()) {
-      // do not roll back, because it will stop anyway
-      LOGGER.warn("{} cannot stop all consensus pipes", thisNode);
-    }
     
MetricService.getInstance().removeMetricSet(this.pipeConsensusServerMetrics);
     stateMachine.stop();
     isStarted.set(false);
   }
 
   public synchronized void clear() {
-    final List<Peer> otherPeers = peerManager.getOtherPeers(thisNode);
-    final List<Peer> failedPipes =
-        updateConsensusPipesStatus(new ArrayList<>(otherPeers), 
PipeStatus.DROPPED);
-    if (!failedPipes.isEmpty()) {
-      // do not roll back, because it will clear anyway
-      LOGGER.warn("{} cannot drop all consensus pipes", thisNode);
-    }
-
     
MetricService.getInstance().removeMetricSet(this.pipeConsensusServerMetrics);
     peerManager.clear();
     stateMachine.stop();
@@ -196,55 +135,10 @@ public class PipeConsensusServerImpl {
     active.set(false);
   }
 
-  private List<Peer> createConsensusPipes(Set<Peer> peers) {
-    return peers.stream()
-        .filter(
-            peer -> {
-              try {
-                if (!peers.equals(thisNode)) {
-                  consensusPipeManager.createConsensusPipe(thisNode, peer);
-                }
-                return true;
-              } catch (Exception e) {
-                LOGGER.warn(
-                    "{}: cannot create consensus pipe between {} and {}",
-                    e.getMessage(),
-                    thisNode,
-                    peer,
-                    e);
-                return false;
-              }
-            })
-        .collect(Collectors.toList());
-  }
-
   /**
-   * update given consensus pipes' status, returns the peer corresponding to 
the pipe that failed to
-   * update
+   * Detect inconsistencies between expected and existed consensus pipes. 
Actual remediation
+   * (create/drop/update) is handled by ConfigNode; this method only logs 
warnings.
    */
-  private List<Peer> updateConsensusPipesStatus(List<Peer> peers, PipeStatus 
status) {
-    return peers.stream()
-        .filter(
-            peer -> {
-              try {
-                if (!peer.equals(thisNode)) {
-                  consensusPipeManager.updateConsensusPipe(
-                      new ConsensusPipeName(thisNode, peer), status);
-                }
-                return false;
-              } catch (Exception e) {
-                LOGGER.warn(
-                    "{}: cannot update consensus pipe between {} and {} to 
status {}",
-                    e.getMessage(),
-                    thisNode,
-                    peer,
-                    status);
-                return true;
-              }
-            })
-        .collect(Collectors.toList());
-  }
-
   public synchronized void checkConsensusPipe(Map<ConsensusPipeName, 
PipeStatus> existedPipes) {
     final PipeStatus expectedStatus = isStarted.get() ? PipeStatus.RUNNING : 
PipeStatus.STOPPED;
     final Map<ConsensusPipeName, Peer> expectedPipes =
@@ -256,54 +150,27 @@ public class PipeConsensusServerImpl {
     existedPipes.forEach(
         (existedName, existedStatus) -> {
           if (!expectedPipes.containsKey(existedName)) {
-            try {
-              LOGGER.warn("{} drop consensus pipe [{}]", consensusGroupId, 
existedName);
-              consensusPipeManager.updateConsensusPipe(existedName, 
PipeStatus.DROPPED);
-            } catch (Exception e) {
-              LOGGER.warn("{} cannot drop consensus pipe [{}]", 
consensusGroupId, existedName, e);
-            }
+            LOGGER.warn(
+                "{} unexpected consensus pipe [{}] exists, should be dropped 
by ConfigNode",
+                consensusGroupId,
+                existedName);
           } else if (!expectedStatus.equals(existedStatus)) {
-            try {
-              LOGGER.warn(
-                  "{} update consensus pipe [{}] to status {}",
-                  consensusGroupId,
-                  existedName,
-                  expectedStatus);
-              if (expectedStatus.equals(PipeStatus.RUNNING)) {
-                // Do nothing. Because Pipe framework's metaSync will do that.
-                return;
-              }
-              consensusPipeManager.updateConsensusPipe(existedName, 
expectedStatus);
-            } catch (Exception e) {
-              LOGGER.warn(
-                  "{} cannot update consensus pipe [{}] to status {}",
-                  consensusGroupId,
-                  existedName,
-                  expectedStatus,
-                  e);
-            }
+            LOGGER.warn(
+                "{} consensus pipe [{}] status mismatch: expected={}, 
actual={}",
+                consensusGroupId,
+                existedName,
+                expectedStatus,
+                existedStatus);
           }
         });
 
     expectedPipes.forEach(
         (expectedName, expectedPeer) -> {
           if (!existedPipes.containsKey(expectedName)) {
-            try {
-              LOGGER.warn(
-                  "{} create and update consensus pipe [{}] to status {}",
-                  consensusGroupId,
-                  expectedName,
-                  expectedStatus);
-              consensusPipeManager.createConsensusPipe(thisNode, expectedPeer);
-              consensusPipeManager.updateConsensusPipe(expectedName, 
expectedStatus);
-            } catch (Exception e) {
-              LOGGER.warn(
-                  "{} cannot create and update consensus pipe [{}] to status 
{}",
-                  consensusGroupId,
-                  expectedName,
-                  expectedStatus,
-                  e);
-            }
+            LOGGER.warn(
+                "{} consensus pipe [{}] missing, should be created by 
ConfigNode",
+                consensusGroupId,
+                expectedName);
           }
         });
   }
@@ -427,7 +294,7 @@ public class PipeConsensusServerImpl {
     try {
       // This node which acts as coordinator will transfer complete historical 
snapshot to new
       // target.
-      createConsensusPipeToTargetPeer(targetPeer, false);
+      createConsensusPipeToTargetPeer(targetPeer);
     } catch (Exception e) {
       LOGGER.warn(
           "{} cannot create consensus pipe to {}, may because target peer is 
unknown currently, please manually check!",
@@ -438,17 +305,11 @@ public class PipeConsensusServerImpl {
     }
   }
 
-  public synchronized void createConsensusPipeToTargetPeer(
-      Peer targetPeer, boolean needManuallyStart) throws 
ConsensusGroupModifyPeerException {
-    try {
-      KillPoint.setKillPoint(DataNodeKillPoints.ORIGINAL_ADD_PEER_DONE);
-      consensusPipeManager.createConsensusPipe(thisNode, targetPeer, 
needManuallyStart);
-      peerManager.addPeer(targetPeer);
-    } catch (Exception e) {
-      LOGGER.warn("{} cannot create consensus pipe to {}", thisNode, 
targetPeer, e);
-      throw new ConsensusGroupModifyPeerException(
-          String.format("%s cannot create consensus pipe to %s", thisNode, 
targetPeer), e);
-    }
+  public synchronized void createConsensusPipeToTargetPeer(Peer targetPeer)
+      throws ConsensusGroupModifyPeerException {
+    KillPoint.setKillPoint(DataNodeKillPoints.ORIGINAL_ADD_PEER_DONE);
+    // Pipe creation is delegated to ConfigNode; only update local peer list.
+    peerManager.addPeer(targetPeer);
   }
 
   public void notifyPeersToDropConsensusPipe(Peer targetPeer)
@@ -493,32 +354,8 @@ public class PipeConsensusServerImpl {
 
   public synchronized void dropConsensusPipeToTargetPeer(Peer targetPeer)
       throws ConsensusGroupModifyPeerException {
-    try {
-      consensusPipeManager.dropConsensusPipe(thisNode, targetPeer);
-      peerManager.removePeer(targetPeer);
-    } catch (Exception e) {
-      LOGGER.warn("{} cannot drop consensus pipe to {}", thisNode, targetPeer, 
e);
-      throw new ConsensusGroupModifyPeerException(
-          String.format("%s cannot drop consensus pipe to %s", thisNode, 
targetPeer), e);
-    }
-  }
-
-  public void startOtherConsensusPipesToTargetPeer(Peer targetPeer)
-      throws ConsensusGroupModifyPeerException {
-    final List<Peer> otherPeers = peerManager.getOtherPeers(thisNode);
-    for (Peer peer : otherPeers) {
-      if (peer.equals(targetPeer)) {
-        continue;
-      }
-      try {
-        consensusPipeManager.updateConsensusPipe(
-            new ConsensusPipeName(peer, targetPeer), PipeStatus.RUNNING);
-      } catch (Exception e) {
-        // just warn but not throw exceptions. Because there may exist unknown 
nodes in consensus
-        // group
-        LOGGER.warn("{} cannot start consensus pipe to {}", peer, targetPeer, 
e);
-      }
-    }
+    // Pipe drop is delegated to ConfigNode; only update local peer list.
+    peerManager.removePeer(targetPeer);
   }
 
   /** Wait for the user written data up to firstCheck to be replicated */
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeDispatcher.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeDispatcher.java
deleted file mode 100644
index 568f68bb577..00000000000
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeDispatcher.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.consensus.pipe.consensuspipe;
-
-import java.util.Map;
-
-public interface ConsensusPipeDispatcher {
-  void createPipe(
-      String pipeName,
-      Map<String, String> extractorAttributes,
-      Map<String, String> processorAttributes,
-      Map<String, String> connectorAttributes,
-      boolean needManuallyStart)
-      throws Exception;
-
-  void startPipe(String pipeName) throws Exception;
-
-  void stopPipe(String pipeName) throws Exception;
-
-  /**
-   * Use ConsensusPipeName instead of String to provide information for 
receiverAgent to release
-   * corresponding resource
-   */
-  void dropPipe(ConsensusPipeName pipeName) throws Exception;
-}
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeManager.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeManager.java
deleted file mode 100644
index c1aef74a4b4..00000000000
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeManager.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.consensus.pipe.consensuspipe;
-
-import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
-import org.apache.iotdb.consensus.common.Peer;
-import org.apache.iotdb.consensus.config.PipeConsensusConfig;
-import org.apache.iotdb.consensus.config.PipeConsensusConfig.ReplicateMode;
-
-import com.google.common.collect.ImmutableMap;
-import org.apache.tsfile.external.commons.lang3.tuple.ImmutableTriple;
-import org.apache.tsfile.external.commons.lang3.tuple.Triple;
-
-import java.util.Map;
-
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_CONSENSUS_GROUP_ID_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_CONSENSUS_PIPE_NAME;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_IP_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_PORT_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_CAPTURE_TABLE_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_CAPTURE_TREE_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_CONSENSUS_GROUP_ID_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_CONSENSUS_RECEIVER_DATANODE_ID_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_CONSENSUS_SENDER_DATANODE_ID_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_INCLUSION_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_IOTDB_USER_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_REALTIME_MODE_KEY;
-
-public class ConsensusPipeManager {
-  // Extract data.insert and data.delete to support deletion.
-  private static final String CONSENSUS_EXTRACTOR_INCLUSION_VALUE = "data";
-  private final PipeConsensusConfig.Pipe config;
-  private final ReplicateMode replicateMode;
-  private final ConsensusPipeDispatcher dispatcher;
-  private final ConsensusPipeSelector selector;
-
-  public ConsensusPipeManager(PipeConsensusConfig.Pipe config, ReplicateMode 
replicateMode) {
-    this.config = config;
-    this.replicateMode = replicateMode;
-    this.dispatcher = config.getConsensusPipeDispatcher();
-    this.selector = config.getConsensusPipeSelector();
-  }
-
-  /** This method is used except region migration. */
-  public void createConsensusPipe(Peer senderPeer, Peer receiverPeer) throws 
Exception {
-    ConsensusPipeName consensusPipeName = new ConsensusPipeName(senderPeer, 
receiverPeer);
-    // The third parameter is only used when region migration. Since this 
method is not called by
-    // region migration, just pass senderPeer in to get the correct result.
-    Triple<ImmutableMap<String, String>, ImmutableMap<String, String>, 
ImmutableMap<String, String>>
-        params = buildPipeParams(senderPeer, receiverPeer);
-    dispatcher.createPipe(
-        consensusPipeName.toString(),
-        params.getLeft(),
-        params.getMiddle(),
-        params.getRight(),
-        false);
-  }
-
-  /** This method is used when executing region migration */
-  public void createConsensusPipe(Peer senderPeer, Peer receiverPeer, boolean 
needManuallyStart)
-      throws Exception {
-    ConsensusPipeName consensusPipeName = new ConsensusPipeName(senderPeer, 
receiverPeer);
-    Triple<ImmutableMap<String, String>, ImmutableMap<String, String>, 
ImmutableMap<String, String>>
-        params = buildPipeParams(senderPeer, receiverPeer);
-    dispatcher.createPipe(
-        consensusPipeName.toString(),
-        params.getLeft(),
-        params.getMiddle(),
-        params.getRight(),
-        needManuallyStart);
-  }
-
-  public Triple<
-          ImmutableMap<String, String>, ImmutableMap<String, String>, 
ImmutableMap<String, String>>
-      buildPipeParams(final Peer senderPeer, final Peer receiverPeer) {
-    final ConsensusPipeName consensusPipeName = new 
ConsensusPipeName(senderPeer, receiverPeer);
-    return new ImmutableTriple<>(
-        ImmutableMap.<String, String>builder()
-            .put(EXTRACTOR_KEY, config.getExtractorPluginName())
-            .put(EXTRACTOR_INCLUSION_KEY, CONSENSUS_EXTRACTOR_INCLUSION_VALUE)
-            .put(
-                EXTRACTOR_CONSENSUS_GROUP_ID_KEY,
-                consensusPipeName.getConsensusGroupId().toString())
-            .put(
-                EXTRACTOR_CONSENSUS_SENDER_DATANODE_ID_KEY,
-                String.valueOf(consensusPipeName.getSenderDataNodeId()))
-            .put(
-                EXTRACTOR_CONSENSUS_RECEIVER_DATANODE_ID_KEY,
-                String.valueOf(consensusPipeName.getReceiverDataNodeId()))
-            .put(EXTRACTOR_REALTIME_MODE_KEY, replicateMode.getValue())
-            .put(EXTRACTOR_CAPTURE_TABLE_KEY, String.valueOf(true))
-            .put(EXTRACTOR_CAPTURE_TREE_KEY, String.valueOf(true))
-            .put(
-                EXTRACTOR_IOTDB_USER_KEY,
-                
CommonDescriptor.getInstance().getConfig().getDefaultAdminName())
-            .build(),
-        ImmutableMap.<String, String>builder()
-            .put(PROCESSOR_KEY, config.getProcessorPluginName())
-            .build(),
-        ImmutableMap.<String, String>builder()
-            .put(CONNECTOR_KEY, config.getConnectorPluginName())
-            .put(
-                CONNECTOR_CONSENSUS_GROUP_ID_KEY,
-                
String.valueOf(consensusPipeName.getConsensusGroupId().getId()))
-            .put(CONNECTOR_CONSENSUS_PIPE_NAME, consensusPipeName.toString())
-            .put(CONNECTOR_IOTDB_IP_KEY, receiverPeer.getEndpoint().ip)
-            .put(CONNECTOR_IOTDB_PORT_KEY, 
String.valueOf(receiverPeer.getEndpoint().port))
-            .put(CONNECTOR_IOTDB_PARALLEL_TASKS_KEY, String.valueOf(1))
-            .put(CONNECTOR_REALTIME_FIRST_KEY, String.valueOf(false))
-            .build());
-  }
-
-  public void dropConsensusPipe(Peer senderPeer, Peer receiverPeer) throws 
Exception {
-    ConsensusPipeName consensusPipeName = new ConsensusPipeName(senderPeer, 
receiverPeer);
-    dispatcher.dropPipe(consensusPipeName);
-  }
-
-  public void updateConsensusPipe(ConsensusPipeName consensusPipeName, 
PipeStatus pipeStatus)
-      throws Exception {
-    if (PipeStatus.RUNNING.equals(pipeStatus)) {
-      dispatcher.startPipe(consensusPipeName.toString());
-    } else if (PipeStatus.STOPPED.equals(pipeStatus)) {
-      dispatcher.stopPipe(consensusPipeName.toString());
-    } else if (PipeStatus.DROPPED.equals(pipeStatus)) {
-      dispatcher.dropPipe(consensusPipeName);
-    } else {
-      throw new IllegalArgumentException("Unsupported pipe status: " + 
pipeStatus);
-    }
-  }
-
-  public Map<ConsensusPipeName, PipeStatus> getAllConsensusPipe() {
-    return selector.getAllConsensusPipe();
-  }
-}
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCServiceProcessor.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCServiceProcessor.java
index 3aa69af6ff5..b99abc27dfc 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCServiceProcessor.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCServiceProcessor.java
@@ -125,8 +125,7 @@ public class PipeConsensusRPCServiceProcessor implements 
PipeConsensusIService.I
           new Peer(
               
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.targetPeerConsensusGroupId),
               req.targetPeerNodeId,
-              req.targetPeerEndPoint),
-          false);
+              req.targetPeerEndPoint));
       responseStatus = new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     } catch (ConsensusGroupModifyPeerException e) {
       responseStatus = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index 700fd79e5eb..a33cdd11024 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -44,7 +44,6 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import 
org.apache.iotdb.db.consensus.statemachine.dataregion.DataRegionStateMachine;
 import 
org.apache.iotdb.db.consensus.statemachine.dataregion.IoTConsensusDataRegionStateMachine;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
-import org.apache.iotdb.db.pipe.consensus.ConsensusPipeDataNodeDispatcher;
 import 
org.apache.iotdb.db.pipe.consensus.ConsensusPipeDataNodeRuntimeAgentGuardian;
 import org.apache.iotdb.db.pipe.consensus.ReplicateProgressDataNodeManager;
 import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResourceManager;
@@ -188,7 +187,6 @@ public class DataRegionConsensusImpl {
                           .setConnectorPluginName(
                               
BuiltinPipePlugin.PIPE_CONSENSUS_ASYNC_CONNECTOR.getPipePluginName())
                           // name
-                          .setConsensusPipeDispatcher(new 
ConsensusPipeDataNodeDispatcher())
                           .setConsensusPipeGuardian(new 
ConsensusPipeDataNodeRuntimeAgentGuardian())
                           .setConsensusPipeSelector(
                               () -> 
PipeDataNodeAgent.task().getAllConsensusPipe())
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ConsensusPipeDataNodeDispatcher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ConsensusPipeDataNodeDispatcher.java
deleted file mode 100644
index 02179a29f56..00000000000
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ConsensusPipeDataNodeDispatcher.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.consensus;
-
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.client.IClientManager;
-import org.apache.iotdb.commons.consensus.ConfigRegionId;
-import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
-import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeDispatcher;
-import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeName;
-import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
-import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
-import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
-import org.apache.iotdb.pipe.api.exception.PipeException;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeRPCMessageConstant.PIPE_ALREADY_EXIST_MSG;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeRPCMessageConstant.PIPE_NOT_EXIST_MSG;
-
-public class ConsensusPipeDataNodeDispatcher implements 
ConsensusPipeDispatcher {
-  private static final Logger LOGGER =
-      LoggerFactory.getLogger(ConsensusPipeDataNodeDispatcher.class);
-
-  private static final IClientManager<ConfigRegionId, ConfigNodeClient> 
CONFIG_NODE_CLIENT_MANAGER =
-      ConfigNodeClientManager.getInstance();
-
-  @Override
-  public void createPipe(
-      String pipeName,
-      Map<String, String> extractorAttributes,
-      Map<String, String> processorAttributes,
-      Map<String, String> connectorAttributes,
-      boolean needManuallyStart)
-      throws Exception {
-    try (ConfigNodeClient configNodeClient =
-        
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
-      TCreatePipeReq req =
-          new TCreatePipeReq()
-              .setPipeName(pipeName)
-              .setNeedManuallyStart(needManuallyStart)
-              .setExtractorAttributes(extractorAttributes)
-              .setProcessorAttributes(processorAttributes)
-              .setConnectorAttributes(connectorAttributes);
-      TSStatus status = configNodeClient.createPipe(req);
-      if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != status.getCode()) {
-        LOGGER.warn("Failed to create consensus pipe-{}, status: {}", 
pipeName, status);
-        // ignore idempotence logic
-        if (status.getMessage().contains(PIPE_ALREADY_EXIST_MSG)) {
-          return;
-        }
-        throw new PipeException(status.getMessage());
-      }
-    } catch (Exception e) {
-      LOGGER.warn("Failed to create consensus pipe-{}", pipeName, e);
-      throw new PipeException("Failed to create consensus pipe", e);
-    }
-  }
-
-  @Override
-  public void startPipe(String pipeName) throws Exception {
-    try (ConfigNodeClient configNodeClient =
-        
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
-      TSStatus status = configNodeClient.startPipe(pipeName);
-      if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != status.getCode()) {
-        LOGGER.warn("Failed to start consensus pipe-{}, status: {}", pipeName, 
status);
-        throw new PipeException(status.getMessage());
-      }
-    } catch (Exception e) {
-      LOGGER.warn("Failed to start consensus pipe-{}", pipeName, e);
-      throw new PipeException("Failed to start consensus pipe", e);
-    }
-  }
-
-  @Override
-  public void stopPipe(String pipeName) throws Exception {
-    try (ConfigNodeClient configNodeClient =
-        
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
-      final TSStatus status = configNodeClient.stopPipe(pipeName);
-      if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != status.getCode()) {
-        LOGGER.warn("Failed to stop consensus pipe-{}, status: {}", pipeName, 
status);
-        throw new PipeException(status.getMessage());
-      }
-    } catch (Exception e) {
-      LOGGER.warn("Failed to stop consensus pipe-{}", pipeName, e);
-      throw new PipeException("Failed to stop consensus pipe", e);
-    }
-  }
-
-  // Use ConsensusPipeName instead of String to provide information for 
receiverAgent to release
-  // corresponding resource
-  @Override
-  public void dropPipe(ConsensusPipeName pipeName) throws Exception {
-    try (ConfigNodeClient configNodeClient =
-        
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
-      final TSStatus status = configNodeClient.dropPipe(pipeName.toString());
-      if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != status.getCode()) {
-        LOGGER.warn("Failed to drop consensus pipe-{}, status: {}", pipeName, 
status);
-        // ignore idempotence logic
-        if (status.getMessage().contains(PIPE_NOT_EXIST_MSG)) {
-          return;
-        }
-        throw new PipeException(status.getMessage());
-      }
-    } catch (Exception e) {
-      LOGGER.warn("Failed to drop consensus pipe-{}", pipeName, e);
-      throw new PipeException("Failed to drop consensus pipe", e);
-    }
-  }
-}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
index eaa4fa0a4e9..2d7cbe18358 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -303,26 +303,6 @@ public class DataNode extends ServerCommandLine implements 
DataNodeMBean {
                 thisNode);
         DNAuditLogger.getInstance().log(fields, () -> logMessage);
       }
-
-      if (isUsingPipeConsensus()) {
-        long dataRegionStartTime = System.currentTimeMillis();
-        while (!StorageEngine.getInstance().isReadyForNonReadWriteFunctions()) 
{
-          try {
-            TimeUnit.MILLISECONDS.sleep(1000);
-          } catch (InterruptedException e) {
-            logger.warn("IoTDB DataNode failed to set up.", e);
-            Thread.currentThread().interrupt();
-            return;
-          }
-        }
-        DataRegionConsensusImpl.getInstance().start();
-        long dataRegionEndTime = System.currentTimeMillis();
-        logger.info(
-            "DataRegion consensus start successfully, which takes {} ms.",
-            (dataRegionEndTime - dataRegionStartTime));
-        dataRegionConsensusStarted = true;
-      }
-
     } catch (Throwable e) {
       int exitStatusCode = retrieveExitStatusCode(e);
       logger.error("Fail to start server", e);
@@ -770,11 +750,11 @@ public class DataNode extends ServerCommandLine 
implements DataNodeMBean {
    *
    * @throws StartupException if start up failed.
    */
-  private void active() throws StartupException {
+  private void active() throws StartupException, IOException {
     try {
       processPid();
       setUp();
-    } catch (StartupException e) {
+    } catch (StartupException | IOException e) {
       logger.error("Meet error while starting up.", e);
       throw e;
     }
@@ -808,7 +788,7 @@ public class DataNode extends ServerCommandLine implements 
DataNodeMBean {
     }
   }
 
-  private void setUp() throws StartupException {
+  private void setUp() throws StartupException, IOException {
     logger.info("Setting up IoTDB DataNode...");
     registerManager.register(new JMXService());
     JMXService.registerMBean(getInstance(), mbeanName);
@@ -863,6 +843,33 @@ public class DataNode extends ServerCommandLine implements 
DataNodeMBean {
 
     registerManager.register(CompactionTaskManager.getInstance());
 
+    // Start PipeConsensus (DataRegionConsensus) before Internal RPC Service 
and Pipe Agent
+    // recovery.
+    // This ensures consensus groups are registered so that deleteLocalPeer() 
can succeed when
+    // DELETE_OLD_REGION_PEER requests arrive during the pipe recovery phase.
+    if (isUsingPipeConsensus()) {
+      long dataRegionStartTime = System.currentTimeMillis();
+      while (!StorageEngine.getInstance().isReadyForNonReadWriteFunctions()) {
+        try {
+          TimeUnit.MILLISECONDS.sleep(1000);
+        } catch (InterruptedException e) {
+          logger.warn("IoTDB DataNode failed to set up.", e);
+          Thread.currentThread().interrupt();
+          return;
+        }
+      }
+      DataRegionConsensusImpl.getInstance().start();
+      long dataRegionEndTime = System.currentTimeMillis();
+      logger.info(
+          "DataRegion consensus start successfully, which takes {} ms.",
+          (dataRegionEndTime - dataRegionStartTime));
+      dataRegionConsensusStarted = true;
+    }
+
+    // Start Internal RPC Service before pipe agent recovery, so that the 
DataNode can accept
+    // cluster scheduling requests (e.g. DELETE_OLD_REGION_PEER) while pipe 
recovery is in progress.
+    registerInternalRPCService();
+
     // Register subscription agent before pipe agent
     registerManager.register(SubscriptionAgent.runtime());
     registerManager.register(PipeDataNodeAgent.runtime());
@@ -873,12 +880,8 @@ public class DataNode extends ServerCommandLine implements 
DataNodeMBean {
 
   /** Set up RPC and protocols after DataNode is available */
   private void setUpRPCService() throws StartupException {
-
-    registerInternalRPCService();
-
-    // Notice: During the period between starting the internal RPC service
-    // and starting the client RPC service , some requests may fail because
-    // DataNode is not marked as RUNNING by ConfigNode-leader yet.
+    // Internal RPC Service is already started in setUp() before pipe agent 
recovery,
+    // so we only need to start client RPC and protocols here.
 
     // Start client RPCService to indicate that the current DataNode provide 
external services
     IoTDBDescriptor.getInstance()

Reply via email to