This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b34b714c80e [IoTConsensusV2 X Region Migration] Start consensus pipe
in proper time rather than automatically start it when create it
b34b714c80e is described below
commit b34b714c80e386e1f4ff37e277b0443a1a9f6f10
Author: Peng Junzhi <[email protected]>
AuthorDate: Thu Nov 28 10:31:24 2024 +0800
[IoTConsensusV2 X Region Migration] Start consensus pipe in proper time
rather than automatically start it when create it
---
.../impl/pipe/task/CreatePipeProcedureV2.java | 4 +++-
.../apache/iotdb/consensus/pipe/PipeConsensus.java | 13 ++++++++----
.../consensus/pipe/PipeConsensusServerImpl.java | 24 +++++++++++++++++++---
.../consensuspipe/ConsensusPipeDispatcher.java | 3 ++-
.../pipe/consensuspipe/ConsensusPipeManager.java | 18 +++++++++++++---
.../service/PipeConsensusRPCServiceProcessor.java | 3 ++-
.../consensus/ConsensusPipeDataNodeDispatcher.java | 4 +++-
.../src/main/thrift/confignode.thrift | 1 +
8 files changed, 56 insertions(+), 14 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index c552191a856..c08c9a2a2b7 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -203,7 +203,9 @@ public class CreatePipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
}
pipeRuntimeMeta = new PipeRuntimeMeta(consensusGroupIdToTaskMetaMap);
- pipeRuntimeMeta.getStatus().set(PipeStatus.RUNNING);
+ if (!createPipeRequest.needManuallyStart) {
+ pipeRuntimeMeta.getStatus().set(PipeStatus.RUNNING);
+ }
}
@Override
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
index 081085f2834..358b34fb98d 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
@@ -332,9 +332,8 @@ public class PipeConsensus implements IConsensus {
impl.setRemotePeerActive(peer, false);
// step 2: notify all the other Peers to create consensus pipes to
newPeer
- // NOTE: For this step, coordinator(thisNode) will transfer its full
data snapshot to target,
- // while other peers will only transmit data(may contain both historical
and realtime data)
- // after the snapshot progress to target.
+ // NOTE: For this step, coordinator(thisNode) will transfer its full
data snapshot to target
+ // while other peers record the coordinator's progress.
LOGGER.info("[{}] notify current peers to create consensus pipes...",
CLASS_NAME);
impl.notifyPeersToCreateConsensusPipes(peer, impl.getThisNodePeer());
KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_TRANSITION);
@@ -343,7 +342,13 @@ public class PipeConsensus implements IConsensus {
LOGGER.info("[{}] wait until all the other peers finish
transferring...", CLASS_NAME);
impl.waitPeersToTargetPeerTransmissionCompleted(peer);
- // step 4: active new Peer to let new Peer receive snapshot
+ // step 4. start other peers' consensus pipe to target peer to transfer
remaining data
+ // NOTE: For this step, other peers will start to transmit data(may
contain both historical
+ // and realtime data) after the snapshot progress to target.
+ LOGGER.info("[{}] start transfer remaining data from other peers",
CLASS_NAME);
+ impl.startOtherConsensusPipesToTargetPeer(peer);
+
+ // step 5: active new Peer to let new Peer receive client requests
LOGGER.info("[{}] activate new peer...", CLASS_NAME);
impl.setRemotePeerActive(peer, true);
KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_DONE);
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
index 882cca72c25..4c2855dc412 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
@@ -425,7 +425,7 @@ public class PipeConsensusServerImpl {
try {
// This node which acts as coordinator will transfer complete historical
snapshot to new
// target.
- createConsensusPipeToTargetPeer(targetPeer, thisNode);
+ createConsensusPipeToTargetPeer(targetPeer, thisNode, false);
} catch (Exception e) {
LOGGER.warn(
"{} cannot create consensus pipe to {}, may because target peer is
unknown currently, please manually check!",
@@ -437,11 +437,11 @@ public class PipeConsensusServerImpl {
}
public synchronized void createConsensusPipeToTargetPeer(
- Peer targetPeer, Peer regionMigrationCoordinatorPeer)
+ Peer targetPeer, Peer regionMigrationCoordinatorPeer, boolean
needManuallyStart)
throws ConsensusGroupModifyPeerException {
try {
consensusPipeManager.createConsensusPipe(
- thisNode, targetPeer, regionMigrationCoordinatorPeer);
+ thisNode, targetPeer, regionMigrationCoordinatorPeer,
needManuallyStart);
peerManager.addAndPersist(targetPeer);
} catch (IOException e) {
LOGGER.warn("{} cannot persist peer {}", thisNode, targetPeer, e);
@@ -510,6 +510,24 @@ public class PipeConsensusServerImpl {
}
}
+ public void startOtherConsensusPipesToTargetPeer(Peer targetPeer)
+ throws ConsensusGroupModifyPeerException {
+ final List<Peer> otherPeers = peerManager.getOtherPeers(thisNode);
+ for (Peer peer : otherPeers) {
+ if (peer.equals(targetPeer)) {
+ continue;
+ }
+ try {
+ consensusPipeManager.updateConsensusPipe(
+ new ConsensusPipeName(peer, targetPeer), PipeStatus.RUNNING);
+ } catch (Exception e) {
+ LOGGER.warn("{} cannot start consensus pipe to {}", peer, targetPeer,
e);
+ throw new ConsensusGroupModifyPeerException(
+ String.format("%s cannot start consensus pipe to %s", peer,
targetPeer), e);
+ }
+ }
+ }
+
public void waitPeersToTargetPeerTransmissionCompleted(Peer targetPeer)
throws ConsensusGroupModifyPeerException {
boolean isTransmissionCompleted = false;
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeDispatcher.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeDispatcher.java
index ac7045a5514..568f68bb577 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeDispatcher.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeDispatcher.java
@@ -26,7 +26,8 @@ public interface ConsensusPipeDispatcher {
String pipeName,
Map<String, String> extractorAttributes,
Map<String, String> processorAttributes,
- Map<String, String> connectorAttributes)
+ Map<String, String> connectorAttributes,
+ boolean needManuallyStart)
throws Exception;
void startPipe(String pipeName) throws Exception;
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeManager.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeManager.java
index aed0155e9c3..f5de22508ba 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeManager.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeManager.java
@@ -71,17 +71,29 @@ public class ConsensusPipeManager {
Triple<ImmutableMap<String, String>, ImmutableMap<String, String>,
ImmutableMap<String, String>>
params = buildPipeParams(senderPeer, receiverPeer, senderPeer);
dispatcher.createPipe(
- consensusPipeName.toString(), params.getLeft(), params.getMiddle(),
params.getRight());
+ consensusPipeName.toString(),
+ params.getLeft(),
+ params.getMiddle(),
+ params.getRight(),
+ false);
}
/** This method is used when executing region migration */
public void createConsensusPipe(
- Peer senderPeer, Peer receiverPeer, Peer regionMigrationCoordinatorPeer)
throws Exception {
+ Peer senderPeer,
+ Peer receiverPeer,
+ Peer regionMigrationCoordinatorPeer,
+ boolean needManuallyStart)
+ throws Exception {
ConsensusPipeName consensusPipeName = new ConsensusPipeName(senderPeer,
receiverPeer);
Triple<ImmutableMap<String, String>, ImmutableMap<String, String>,
ImmutableMap<String, String>>
params = buildPipeParams(senderPeer, receiverPeer,
regionMigrationCoordinatorPeer);
dispatcher.createPipe(
- consensusPipeName.toString(), params.getLeft(), params.getMiddle(),
params.getRight());
+ consensusPipeName.toString(),
+ params.getLeft(),
+ params.getMiddle(),
+ params.getRight(),
+ needManuallyStart);
}
public Triple<
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCServiceProcessor.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCServiceProcessor.java
index cfd7a464ceb..378ffa6f78d 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCServiceProcessor.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCServiceProcessor.java
@@ -117,7 +117,8 @@ public class PipeConsensusRPCServiceProcessor implements
PipeConsensusIService.I
new Peer(
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.targetPeerConsensusGroupId),
req.coordinatorPeerNodeId,
- req.coordinatorPeerEndPoint));
+ req.coordinatorPeerEndPoint),
+ true);
responseStatus = new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (ConsensusGroupModifyPeerException e) {
responseStatus = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ConsensusPipeDataNodeDispatcher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ConsensusPipeDataNodeDispatcher.java
index 9a95d833788..545d068e53a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ConsensusPipeDataNodeDispatcher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ConsensusPipeDataNodeDispatcher.java
@@ -49,13 +49,15 @@ public class ConsensusPipeDataNodeDispatcher implements
ConsensusPipeDispatcher
String pipeName,
Map<String, String> extractorAttributes,
Map<String, String> processorAttributes,
- Map<String, String> connectorAttributes)
+ Map<String, String> connectorAttributes,
+ boolean needManuallyStart)
throws Exception {
try (ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
TCreatePipeReq req =
new TCreatePipeReq()
.setPipeName(pipeName)
+ .setNeedManuallyStart(needManuallyStart)
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes)
.setConnectorAttributes(connectorAttributes);
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index 7fdf78e7152..dc647955443 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -737,6 +737,7 @@ struct TCreatePipeReq {
3: optional map<string, string> processorAttributes
4: required map<string, string> connectorAttributes
5: optional bool ifNotExistsCondition
+ 6: optional bool needManuallyStart
}
struct TAlterPipeReq {