This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c789ba8e555 fix addRemotePeer data inconsistency (#14332)
c789ba8e555 is described below

commit c789ba8e5555d8bacaad31b7089d9b25058a647f
Author: Peng Junzhi <[email protected]>
AuthorDate: Fri Dec 6 11:13:24 2024 +0800

    fix addRemotePeer data inconsistency (#14332)
---
 .../apache/iotdb/consensus/pipe/PipeConsensus.java | 15 ++---
 .../consensus/pipe/PipeConsensusServerImpl.java    | 47 +++++++++-----
 .../pipe/consensuspipe/ConsensusPipeManager.java   | 72 ++++++----------------
 .../service/PipeConsensusRPCServiceProcessor.java  |  6 +-
 ...oricalDataRegionTsFileAndDeletionExtractor.java | 20 +-----
 .../config/constant/PipeExtractorConstant.java     |  2 -
 .../src/main/thrift/pipeconsensus.thrift           |  2 -
 7 files changed, 57 insertions(+), 107 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
index 6d3417953d4..df47567e445 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
@@ -332,23 +332,16 @@ public class PipeConsensus implements IConsensus {
       impl.setRemotePeerActive(peer, false, false);
 
       // step 2: notify all the other Peers to create consensus pipes to 
newPeer
-      // NOTE: For this step, coordinator(thisNode) will transfer its full 
data snapshot to target
-      // while other peers record the coordinator's progress.
+      // NOTE: For this step, all the other peers will try to transfer its 
user write data to target
       LOGGER.info("[{}] notify current peers to create consensus pipes...", 
CLASS_NAME);
-      impl.notifyPeersToCreateConsensusPipes(peer, impl.getThisNodePeer());
+      impl.notifyPeersToCreateConsensusPipes(peer);
       
KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_TRANSITION);
 
-      // step 3: wait until the coordinator Peer finishes transferring snapshot
+      // step 3: wait until all other Peers finish transferring
       LOGGER.info("[{}] wait until all the other peers finish 
transferring...", CLASS_NAME);
       impl.waitPeersToTargetPeerTransmissionCompleted(peer);
 
-      // step 4. start other peers' consensus pipe to target peer to transfer 
remaining data
-      // NOTE: For this step, other peers will start to transmit data(may 
contain both historical
-      // and realtime data) after the snapshot progress to target.
-      LOGGER.info("[{}] start transfer remaining data from other peers", 
CLASS_NAME);
-      impl.startOtherConsensusPipesToTargetPeer(peer);
-
-      // step 5: active new Peer to let new Peer receive client requests
+      // step 4: active new Peer to let new Peer receive client requests
       LOGGER.info("[{}] activate new peer...", CLASS_NAME);
       impl.setRemotePeerActive(peer, true, false);
       KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_DONE);
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
index 3a278a80c3f..b085abbd64b 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
@@ -401,10 +401,13 @@ public class PipeConsensusServerImpl {
     }
   }
 
-  public void notifyPeersToCreateConsensusPipes(Peer targetPeer, Peer 
coordinatorPeer)
+  public void notifyPeersToCreateConsensusPipes(Peer targetPeer)
       throws ConsensusGroupModifyPeerException {
     final List<Peer> otherPeers = peerManager.getOtherPeers(thisNode);
     for (Peer peer : otherPeers) {
+      if (peer.equals(targetPeer)) {
+        continue;
+      }
       try (SyncPipeConsensusServiceClient client =
           syncClientManager.borrowClient(peer.getEndpoint())) {
         TNotifyPeerToCreateConsensusPipeResp resp =
@@ -412,9 +415,7 @@ public class PipeConsensusServerImpl {
                 new TNotifyPeerToCreateConsensusPipeReq(
                     targetPeer.getGroupId().convertToTConsensusGroupId(),
                     targetPeer.getEndpoint(),
-                    targetPeer.getNodeId(),
-                    coordinatorPeer.getEndpoint(),
-                    coordinatorPeer.getNodeId()));
+                    targetPeer.getNodeId()));
         if (!RpcUtils.SUCCESS_STATUS.equals(resp.getStatus())) {
           throw new ConsensusGroupModifyPeerException(
               String.format("error when notify peer %s to create consensus 
pipe", peer));
@@ -431,7 +432,7 @@ public class PipeConsensusServerImpl {
     try {
       // This node which acts as coordinator will transfer complete historical 
snapshot to new
       // target.
-      createConsensusPipeToTargetPeer(targetPeer, thisNode, false);
+      createConsensusPipeToTargetPeer(targetPeer, false);
     } catch (Exception e) {
       LOGGER.warn(
           "{} cannot create consensus pipe to {}, may because target peer is 
unknown currently, please manually check!",
@@ -443,12 +444,10 @@ public class PipeConsensusServerImpl {
   }
 
   public synchronized void createConsensusPipeToTargetPeer(
-      Peer targetPeer, Peer regionMigrationCoordinatorPeer, boolean 
needManuallyStart)
-      throws ConsensusGroupModifyPeerException {
+      Peer targetPeer, boolean needManuallyStart) throws 
ConsensusGroupModifyPeerException {
     try {
       KillPoint.setKillPoint(DataNodeKillPoints.ORIGINAL_ADD_PEER_DONE);
-      consensusPipeManager.createConsensusPipe(
-          thisNode, targetPeer, regionMigrationCoordinatorPeer, 
needManuallyStart);
+      consensusPipeManager.createConsensusPipe(thisNode, targetPeer, 
needManuallyStart);
       peerManager.addAndPersist(targetPeer);
     } catch (IOException e) {
       LOGGER.warn("{} cannot persist peer {}", thisNode, targetPeer, e);
@@ -535,22 +534,35 @@ public class PipeConsensusServerImpl {
     }
   }
 
+  /** Wait for the user written data up to firstCheck to be replicated */
   public void waitPeersToTargetPeerTransmissionCompleted(Peer targetPeer)
       throws ConsensusGroupModifyPeerException {
     boolean isTransmissionCompleted = false;
-    boolean isFirstCheck = true;
+    boolean isFirstCheckForCurrentPeer = true;
+    boolean isFirstCheckForOtherPeers = true;
 
     try {
       while (!isTransmissionCompleted) {
         Thread.sleep(CHECK_TRANSMISSION_COMPLETION_INTERVAL_IN_MILLISECONDS);
-        // Only wait coordinator to transfer snapshot instead of waiting all 
peers completing data
-        // transfer. Keep consistent with IoTV1.
-        isTransmissionCompleted =
-            isConsensusPipesTransmissionCompleted(
-                Collections.singletonList(new ConsensusPipeName(thisNode, 
targetPeer).toString()),
-                isFirstCheck);
 
-        isFirstCheck = false;
+        if (isConsensusPipesTransmissionCompleted(
+            Collections.singletonList(new ConsensusPipeName(thisNode, 
targetPeer).toString()),
+            isFirstCheckForCurrentPeer)) {
+          final List<Peer> otherPeers = peerManager.getOtherPeers(thisNode);
+
+          isTransmissionCompleted = true;
+          for (Peer peer : otherPeers) {
+            if (!peer.equals(targetPeer)) {
+              isTransmissionCompleted &=
+                  isRemotePeerConsensusPipesTransmissionCompleted(
+                      peer,
+                      Collections.singletonList(new ConsensusPipeName(peer, 
targetPeer).toString()),
+                      isFirstCheckForOtherPeers);
+            }
+          }
+          isFirstCheckForOtherPeers = false;
+        }
+        isFirstCheckForCurrentPeer = false;
       }
     } catch (InterruptedException e) {
       LOGGER.warn("{} is interrupted when waiting for transfer completed", 
thisNode, e);
@@ -560,6 +572,7 @@ public class PipeConsensusServerImpl {
     }
   }
 
+  /** Wait for the user written data up to firstCheck to be replicated */
   public void waitTargetPeerToPeersTransmissionCompleted(Peer targetPeer)
       throws ConsensusGroupModifyPeerException {
     boolean isTransmissionCompleted = false;
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeManager.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeManager.java
index f5de22508ba..1fc2abe03bf 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeManager.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeManager.java
@@ -41,7 +41,6 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CAPTURE_TREE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CONSENSUS_GROUP_ID_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CONSENSUS_RECEIVER_DATANODE_ID_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CONSENSUS_RESTORE_PROGRESS_PIPE_TASK_NAME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CONSENSUS_SENDER_DATANODE_ID_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_INCLUSION_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_KEY;
@@ -69,7 +68,7 @@ public class ConsensusPipeManager {
     // The third parameter is only used when region migration. Since this 
method is not called by
     // region migration, just pass senderPeer in to get the correct result.
     Triple<ImmutableMap<String, String>, ImmutableMap<String, String>, 
ImmutableMap<String, String>>
-        params = buildPipeParams(senderPeer, receiverPeer, senderPeer);
+        params = buildPipeParams(senderPeer, receiverPeer);
     dispatcher.createPipe(
         consensusPipeName.toString(),
         params.getLeft(),
@@ -79,15 +78,11 @@ public class ConsensusPipeManager {
   }
 
   /** This method is used when executing region migration */
-  public void createConsensusPipe(
-      Peer senderPeer,
-      Peer receiverPeer,
-      Peer regionMigrationCoordinatorPeer,
-      boolean needManuallyStart)
+  public void createConsensusPipe(Peer senderPeer, Peer receiverPeer, boolean 
needManuallyStart)
       throws Exception {
     ConsensusPipeName consensusPipeName = new ConsensusPipeName(senderPeer, 
receiverPeer);
     Triple<ImmutableMap<String, String>, ImmutableMap<String, String>, 
ImmutableMap<String, String>>
-        params = buildPipeParams(senderPeer, receiverPeer, 
regionMigrationCoordinatorPeer);
+        params = buildPipeParams(senderPeer, receiverPeer);
     dispatcher.createPipe(
         consensusPipeName.toString(),
         params.getLeft(),
@@ -98,52 +93,25 @@ public class ConsensusPipeManager {
 
   public Triple<
           ImmutableMap<String, String>, ImmutableMap<String, String>, 
ImmutableMap<String, String>>
-      buildPipeParams(Peer senderPeer, Peer receiverPeer, Peer 
regionMigrationCoordinatorPeer) {
+      buildPipeParams(Peer senderPeer, Peer receiverPeer) {
     ConsensusPipeName consensusPipeName = new ConsensusPipeName(senderPeer, 
receiverPeer);
-
-    ImmutableMap<String, String> extractorParams;
-    if (senderPeer.equals(regionMigrationCoordinatorPeer)) {
-      extractorParams =
-          ImmutableMap.<String, String>builder()
-              .put(EXTRACTOR_KEY, config.getExtractorPluginName())
-              .put(EXTRACTOR_INCLUSION_KEY, 
CONSENSUS_EXTRACTOR_INCLUSION_VALUE)
-              .put(
-                  EXTRACTOR_CONSENSUS_GROUP_ID_KEY,
-                  consensusPipeName.getConsensusGroupId().toString())
-              .put(
-                  EXTRACTOR_CONSENSUS_SENDER_DATANODE_ID_KEY,
-                  String.valueOf(consensusPipeName.getSenderDataNodeId()))
-              .put(
-                  EXTRACTOR_CONSENSUS_RECEIVER_DATANODE_ID_KEY,
-                  String.valueOf(consensusPipeName.getReceiverDataNodeId()))
-              .put(EXTRACTOR_REALTIME_MODE_KEY, replicateMode.getValue())
-              .put(EXTRACTOR_CAPTURE_TABLE_KEY, String.valueOf(true))
-              .put(EXTRACTOR_CAPTURE_TREE_KEY, String.valueOf(true))
-              .build();
-    } else {
-      extractorParams =
-          ImmutableMap.<String, String>builder()
-              .put(EXTRACTOR_KEY, config.getExtractorPluginName())
-              .put(EXTRACTOR_INCLUSION_KEY, 
CONSENSUS_EXTRACTOR_INCLUSION_VALUE)
-              .put(
-                  EXTRACTOR_CONSENSUS_GROUP_ID_KEY,
-                  consensusPipeName.getConsensusGroupId().toString())
-              .put(
-                  EXTRACTOR_CONSENSUS_SENDER_DATANODE_ID_KEY,
-                  String.valueOf(consensusPipeName.getSenderDataNodeId()))
-              .put(
-                  EXTRACTOR_CONSENSUS_RECEIVER_DATANODE_ID_KEY,
-                  String.valueOf(consensusPipeName.getReceiverDataNodeId()))
-              .put(EXTRACTOR_REALTIME_MODE_KEY, replicateMode.getValue())
-              .put(EXTRACTOR_CAPTURE_TABLE_KEY, String.valueOf(true))
-              .put(EXTRACTOR_CAPTURE_TREE_KEY, String.valueOf(true))
-              .put(
-                  EXTRACTOR_CONSENSUS_RESTORE_PROGRESS_PIPE_TASK_NAME_KEY,
-                  String.valueOf(new ConsensusPipeName(senderPeer, 
regionMigrationCoordinatorPeer)))
-              .build();
-    }
     return new ImmutableTriple<>(
-        extractorParams,
+        ImmutableMap.<String, String>builder()
+            .put(EXTRACTOR_KEY, config.getExtractorPluginName())
+            .put(EXTRACTOR_INCLUSION_KEY, CONSENSUS_EXTRACTOR_INCLUSION_VALUE)
+            .put(
+                EXTRACTOR_CONSENSUS_GROUP_ID_KEY,
+                consensusPipeName.getConsensusGroupId().toString())
+            .put(
+                EXTRACTOR_CONSENSUS_SENDER_DATANODE_ID_KEY,
+                String.valueOf(consensusPipeName.getSenderDataNodeId()))
+            .put(
+                EXTRACTOR_CONSENSUS_RECEIVER_DATANODE_ID_KEY,
+                String.valueOf(consensusPipeName.getReceiverDataNodeId()))
+            .put(EXTRACTOR_REALTIME_MODE_KEY, replicateMode.getValue())
+            .put(EXTRACTOR_CAPTURE_TABLE_KEY, String.valueOf(true))
+            .put(EXTRACTOR_CAPTURE_TREE_KEY, String.valueOf(true))
+            .build(),
         ImmutableMap.<String, String>builder()
             .put(PROCESSOR_KEY, config.getProcessorPluginName())
             .build(),
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCServiceProcessor.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCServiceProcessor.java
index 6d75ddfa680..3aa69af6ff5 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCServiceProcessor.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCServiceProcessor.java
@@ -126,11 +126,7 @@ public class PipeConsensusRPCServiceProcessor implements 
PipeConsensusIService.I
               
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.targetPeerConsensusGroupId),
               req.targetPeerNodeId,
               req.targetPeerEndPoint),
-          new Peer(
-              
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.targetPeerConsensusGroupId),
-              req.coordinatorPeerNodeId,
-              req.coordinatorPeerEndPoint),
-          true);
+          false);
       responseStatus = new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     } catch (ConsensusGroupModifyPeerException e) {
       responseStatus = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
index ae08fa8b0cb..011bfcc389a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
@@ -31,8 +31,6 @@ import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeE
 import org.apache.iotdb.commons.pipe.datastructure.PersistentResource;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
-import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeName;
-import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResource;
 import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResourceManager;
 import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
@@ -73,7 +71,6 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CONSENSUS_RESTORE_PROGRESS_PIPE_TASK_NAME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_END_TIME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY;
@@ -308,20 +305,7 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionExtractor
     pipeName = environment.getPipeName();
     creationTime = environment.getCreationTime();
     pipeTaskMeta = environment.getPipeTaskMeta();
-    if 
(parameters.hasAnyAttributes(EXTRACTOR_CONSENSUS_RESTORE_PROGRESS_PIPE_TASK_NAME_KEY))
 {
-      ConsensusPipeName currentNode2CoordinatorPipeName =
-          new ConsensusPipeName(
-              
parameters.getString(EXTRACTOR_CONSENSUS_RESTORE_PROGRESS_PIPE_TASK_NAME_KEY));
-      // For region migration in IoTV2, non-coordinators will only transfer 
data after
-      // `ProgressIndex(non-coordinators2coordinator)`
-      startIndex =
-          PipeDataNodeAgent.task()
-              .getPipeTaskProgressIndex(
-                  currentNode2CoordinatorPipeName.toString(),
-                  
currentNode2CoordinatorPipeName.getConsensusGroupId().getId());
-    } else {
-      startIndex = environment.getPipeTaskMeta().getProgressIndex();
-    }
+    startIndex = environment.getPipeTaskMeta().getProgressIndex();
 
     dataRegionId = environment.getRegionId();
     synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) {
@@ -750,7 +734,7 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionExtractor
         "Pipe {}@{}: finish to extract deletions, extract deletions count 
{}/{}, took {} ms",
         pipeName,
         dataRegionId,
-        resourceList.size(),
+        allDeletionResources.size(),
         originalDeletionCount,
         System.currentTimeMillis() - startTime);
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
index bc57308f1e8..7522cd444fe 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
@@ -135,8 +135,6 @@ public class PipeExtractorConstant {
       "extractor.consensus.sender-dn-id";
   public static final String EXTRACTOR_CONSENSUS_RECEIVER_DATANODE_ID_KEY =
       "extractor.consensus.receiver-dn-id";
-  public static final String 
EXTRACTOR_CONSENSUS_RESTORE_PROGRESS_PIPE_TASK_NAME_KEY =
-      "extractor.consensus.restore-progress-pipe-task-name";
 
   private PipeExtractorConstant() {
     throw new IllegalStateException("Utility class");
diff --git 
a/iotdb-protocol/thrift-consensus/src/main/thrift/pipeconsensus.thrift 
b/iotdb-protocol/thrift-consensus/src/main/thrift/pipeconsensus.thrift
index 3d3bec771db..ca7a043c007 100644
--- a/iotdb-protocol/thrift-consensus/src/main/thrift/pipeconsensus.thrift
+++ b/iotdb-protocol/thrift-consensus/src/main/thrift/pipeconsensus.thrift
@@ -63,8 +63,6 @@ struct TNotifyPeerToCreateConsensusPipeReq {
   1: required common.TConsensusGroupId targetPeerConsensusGroupId
   2: required common.TEndPoint targetPeerEndPoint
   3: required i32 targetPeerNodeId
-  4: required common.TEndPoint coordinatorPeerEndPoint
-  5: required i32 coordinatorPeerNodeId
 }
 
 struct TNotifyPeerToCreateConsensusPipeResp {

Reply via email to