This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b34b714c80e [IoTConsensusV2 X Region Migration] Start consensus pipe 
in proper time rather than automatically start it when create it
b34b714c80e is described below

commit b34b714c80e386e1f4ff37e277b0443a1a9f6f10
Author: Peng Junzhi <[email protected]>
AuthorDate: Thu Nov 28 10:31:24 2024 +0800

    [IoTConsensusV2 X Region Migration] Start consensus pipe in proper time 
rather than automatically start it when create it
---
 .../impl/pipe/task/CreatePipeProcedureV2.java      |  4 +++-
 .../apache/iotdb/consensus/pipe/PipeConsensus.java | 13 ++++++++----
 .../consensus/pipe/PipeConsensusServerImpl.java    | 24 +++++++++++++++++++---
 .../consensuspipe/ConsensusPipeDispatcher.java     |  3 ++-
 .../pipe/consensuspipe/ConsensusPipeManager.java   | 18 +++++++++++++---
 .../service/PipeConsensusRPCServiceProcessor.java  |  3 ++-
 .../consensus/ConsensusPipeDataNodeDispatcher.java |  4 +++-
 .../src/main/thrift/confignode.thrift              |  1 +
 8 files changed, 56 insertions(+), 14 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index c552191a856..c08c9a2a2b7 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -203,7 +203,9 @@ public class CreatePipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
     }
 
     pipeRuntimeMeta = new PipeRuntimeMeta(consensusGroupIdToTaskMetaMap);
-    pipeRuntimeMeta.getStatus().set(PipeStatus.RUNNING);
+    if (!createPipeRequest.needManuallyStart) {
+      pipeRuntimeMeta.getStatus().set(PipeStatus.RUNNING);
+    }
   }
 
   @Override
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
index 081085f2834..358b34fb98d 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
@@ -332,9 +332,8 @@ public class PipeConsensus implements IConsensus {
       impl.setRemotePeerActive(peer, false);
 
       // step 2: notify all the other Peers to create consensus pipes to 
newPeer
-      // NOTE: For this step, coordinator(thisNode) will transfer its full 
data snapshot to target,
-      // while other peers will only transmit data(may contain both historical 
and realtime data)
-      // after the snapshot progress to target.
+      // NOTE: For this step, coordinator(thisNode) will transfer its full 
data snapshot to target
+      // while other peers record the coordinator's progress.
       LOGGER.info("[{}] notify current peers to create consensus pipes...", 
CLASS_NAME);
       impl.notifyPeersToCreateConsensusPipes(peer, impl.getThisNodePeer());
       
KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_TRANSITION);
@@ -343,7 +342,13 @@ public class PipeConsensus implements IConsensus {
       LOGGER.info("[{}] wait until all the other peers finish 
transferring...", CLASS_NAME);
       impl.waitPeersToTargetPeerTransmissionCompleted(peer);
 
-      // step 4: active new Peer to let new Peer receive snapshot
+      // step 4. start other peers' consensus pipe to target peer to transfer 
remaining data
+      // NOTE: For this step, other peers will start to transmit data(may 
contain both historical
+      // and realtime data) after the snapshot progress to target.
+      LOGGER.info("[{}] start transfer remaining data from other peers", 
CLASS_NAME);
+      impl.startOtherConsensusPipesToTargetPeer(peer);
+
+      // step 5: active new Peer to let new Peer receive client requests
       LOGGER.info("[{}] activate new peer...", CLASS_NAME);
       impl.setRemotePeerActive(peer, true);
       KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_DONE);
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
index 882cca72c25..4c2855dc412 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
@@ -425,7 +425,7 @@ public class PipeConsensusServerImpl {
     try {
       // This node which acts as coordinator will transfer complete historical 
snapshot to new
       // target.
-      createConsensusPipeToTargetPeer(targetPeer, thisNode);
+      createConsensusPipeToTargetPeer(targetPeer, thisNode, false);
     } catch (Exception e) {
       LOGGER.warn(
           "{} cannot create consensus pipe to {}, may because target peer is 
unknown currently, please manually check!",
@@ -437,11 +437,11 @@ public class PipeConsensusServerImpl {
   }
 
   public synchronized void createConsensusPipeToTargetPeer(
-      Peer targetPeer, Peer regionMigrationCoordinatorPeer)
+      Peer targetPeer, Peer regionMigrationCoordinatorPeer, boolean 
needManuallyStart)
       throws ConsensusGroupModifyPeerException {
     try {
       consensusPipeManager.createConsensusPipe(
-          thisNode, targetPeer, regionMigrationCoordinatorPeer);
+          thisNode, targetPeer, regionMigrationCoordinatorPeer, 
needManuallyStart);
       peerManager.addAndPersist(targetPeer);
     } catch (IOException e) {
       LOGGER.warn("{} cannot persist peer {}", thisNode, targetPeer, e);
@@ -510,6 +510,24 @@ public class PipeConsensusServerImpl {
     }
   }
 
+  public void startOtherConsensusPipesToTargetPeer(Peer targetPeer)
+      throws ConsensusGroupModifyPeerException {
+    final List<Peer> otherPeers = peerManager.getOtherPeers(thisNode);
+    for (Peer peer : otherPeers) {
+      if (peer.equals(targetPeer)) {
+        continue;
+      }
+      try {
+        consensusPipeManager.updateConsensusPipe(
+            new ConsensusPipeName(peer, targetPeer), PipeStatus.RUNNING);
+      } catch (Exception e) {
+        LOGGER.warn("{} cannot start consensus pipe to {}", peer, targetPeer, 
e);
+        throw new ConsensusGroupModifyPeerException(
+            String.format("%s cannot start consensus pipe to %s", peer, 
targetPeer), e);
+      }
+    }
+  }
+
   public void waitPeersToTargetPeerTransmissionCompleted(Peer targetPeer)
       throws ConsensusGroupModifyPeerException {
     boolean isTransmissionCompleted = false;
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeDispatcher.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeDispatcher.java
index ac7045a5514..568f68bb577 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeDispatcher.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeDispatcher.java
@@ -26,7 +26,8 @@ public interface ConsensusPipeDispatcher {
       String pipeName,
       Map<String, String> extractorAttributes,
       Map<String, String> processorAttributes,
-      Map<String, String> connectorAttributes)
+      Map<String, String> connectorAttributes,
+      boolean needManuallyStart)
       throws Exception;
 
   void startPipe(String pipeName) throws Exception;
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeManager.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeManager.java
index aed0155e9c3..f5de22508ba 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeManager.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeManager.java
@@ -71,17 +71,29 @@ public class ConsensusPipeManager {
     Triple<ImmutableMap<String, String>, ImmutableMap<String, String>, 
ImmutableMap<String, String>>
         params = buildPipeParams(senderPeer, receiverPeer, senderPeer);
     dispatcher.createPipe(
-        consensusPipeName.toString(), params.getLeft(), params.getMiddle(), 
params.getRight());
+        consensusPipeName.toString(),
+        params.getLeft(),
+        params.getMiddle(),
+        params.getRight(),
+        false);
   }
 
   /** This method is used when executing region migration */
   public void createConsensusPipe(
-      Peer senderPeer, Peer receiverPeer, Peer regionMigrationCoordinatorPeer) 
throws Exception {
+      Peer senderPeer,
+      Peer receiverPeer,
+      Peer regionMigrationCoordinatorPeer,
+      boolean needManuallyStart)
+      throws Exception {
     ConsensusPipeName consensusPipeName = new ConsensusPipeName(senderPeer, 
receiverPeer);
     Triple<ImmutableMap<String, String>, ImmutableMap<String, String>, 
ImmutableMap<String, String>>
         params = buildPipeParams(senderPeer, receiverPeer, 
regionMigrationCoordinatorPeer);
     dispatcher.createPipe(
-        consensusPipeName.toString(), params.getLeft(), params.getMiddle(), 
params.getRight());
+        consensusPipeName.toString(),
+        params.getLeft(),
+        params.getMiddle(),
+        params.getRight(),
+        needManuallyStart);
   }
 
   public Triple<
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCServiceProcessor.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCServiceProcessor.java
index cfd7a464ceb..378ffa6f78d 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCServiceProcessor.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCServiceProcessor.java
@@ -117,7 +117,8 @@ public class PipeConsensusRPCServiceProcessor implements 
PipeConsensusIService.I
           new Peer(
               
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.targetPeerConsensusGroupId),
               req.coordinatorPeerNodeId,
-              req.coordinatorPeerEndPoint));
+              req.coordinatorPeerEndPoint),
+          true);
       responseStatus = new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     } catch (ConsensusGroupModifyPeerException e) {
       responseStatus = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ConsensusPipeDataNodeDispatcher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ConsensusPipeDataNodeDispatcher.java
index 9a95d833788..545d068e53a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ConsensusPipeDataNodeDispatcher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ConsensusPipeDataNodeDispatcher.java
@@ -49,13 +49,15 @@ public class ConsensusPipeDataNodeDispatcher implements 
ConsensusPipeDispatcher
       String pipeName,
       Map<String, String> extractorAttributes,
       Map<String, String> processorAttributes,
-      Map<String, String> connectorAttributes)
+      Map<String, String> connectorAttributes,
+      boolean needManuallyStart)
       throws Exception {
     try (ConfigNodeClient configNodeClient =
         
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
       TCreatePipeReq req =
           new TCreatePipeReq()
               .setPipeName(pipeName)
+              .setNeedManuallyStart(needManuallyStart)
               .setExtractorAttributes(extractorAttributes)
               .setProcessorAttributes(processorAttributes)
               .setConnectorAttributes(connectorAttributes);
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift 
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index 7fdf78e7152..dc647955443 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -737,6 +737,7 @@ struct TCreatePipeReq {
     3: optional map<string, string> processorAttributes
     4: required map<string, string> connectorAttributes
     5: optional bool ifNotExistsCondition
+    6: optional bool needManuallyStart
 }
 
 struct TAlterPipeReq {

Reply via email to