This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c789ba8e555 fix addRemotePeer data inconsistency (#14332)
c789ba8e555 is described below
commit c789ba8e5555d8bacaad31b7089d9b25058a647f
Author: Peng Junzhi <[email protected]>
AuthorDate: Fri Dec 6 11:13:24 2024 +0800
fix addRemotePeer data inconsistency (#14332)
---
.../apache/iotdb/consensus/pipe/PipeConsensus.java | 15 ++---
.../consensus/pipe/PipeConsensusServerImpl.java | 47 +++++++++-----
.../pipe/consensuspipe/ConsensusPipeManager.java | 72 ++++++----------------
.../service/PipeConsensusRPCServiceProcessor.java | 6 +-
...oricalDataRegionTsFileAndDeletionExtractor.java | 20 +-----
.../config/constant/PipeExtractorConstant.java | 2 -
.../src/main/thrift/pipeconsensus.thrift | 2 -
7 files changed, 57 insertions(+), 107 deletions(-)
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
index 6d3417953d4..df47567e445 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
@@ -332,23 +332,16 @@ public class PipeConsensus implements IConsensus {
impl.setRemotePeerActive(peer, false, false);
// step 2: notify all the other Peers to create consensus pipes to
newPeer
- // NOTE: For this step, coordinator(thisNode) will transfer its full
data snapshot to target
- // while other peers record the coordinator's progress.
+ // NOTE: For this step, all the other peers will try to transfer its
user write data to target
LOGGER.info("[{}] notify current peers to create consensus pipes...",
CLASS_NAME);
- impl.notifyPeersToCreateConsensusPipes(peer, impl.getThisNodePeer());
+ impl.notifyPeersToCreateConsensusPipes(peer);
KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_TRANSITION);
- // step 3: wait until the coordinator Peer finishes transferring snapshot
+ // step 3: wait until all other Peers finish transferring
LOGGER.info("[{}] wait until all the other peers finish
transferring...", CLASS_NAME);
impl.waitPeersToTargetPeerTransmissionCompleted(peer);
- // step 4. start other peers' consensus pipe to target peer to transfer
remaining data
- // NOTE: For this step, other peers will start to transmit data(may
contain both historical
- // and realtime data) after the snapshot progress to target.
- LOGGER.info("[{}] start transfer remaining data from other peers",
CLASS_NAME);
- impl.startOtherConsensusPipesToTargetPeer(peer);
-
- // step 5: active new Peer to let new Peer receive client requests
+ // step 4: active new Peer to let new Peer receive client requests
LOGGER.info("[{}] activate new peer...", CLASS_NAME);
impl.setRemotePeerActive(peer, true, false);
KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_DONE);
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
index 3a278a80c3f..b085abbd64b 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
@@ -401,10 +401,13 @@ public class PipeConsensusServerImpl {
}
}
- public void notifyPeersToCreateConsensusPipes(Peer targetPeer, Peer
coordinatorPeer)
+ public void notifyPeersToCreateConsensusPipes(Peer targetPeer)
throws ConsensusGroupModifyPeerException {
final List<Peer> otherPeers = peerManager.getOtherPeers(thisNode);
for (Peer peer : otherPeers) {
+ if (peer.equals(targetPeer)) {
+ continue;
+ }
try (SyncPipeConsensusServiceClient client =
syncClientManager.borrowClient(peer.getEndpoint())) {
TNotifyPeerToCreateConsensusPipeResp resp =
@@ -412,9 +415,7 @@ public class PipeConsensusServerImpl {
new TNotifyPeerToCreateConsensusPipeReq(
targetPeer.getGroupId().convertToTConsensusGroupId(),
targetPeer.getEndpoint(),
- targetPeer.getNodeId(),
- coordinatorPeer.getEndpoint(),
- coordinatorPeer.getNodeId()));
+ targetPeer.getNodeId()));
if (!RpcUtils.SUCCESS_STATUS.equals(resp.getStatus())) {
throw new ConsensusGroupModifyPeerException(
String.format("error when notify peer %s to create consensus
pipe", peer));
@@ -431,7 +432,7 @@ public class PipeConsensusServerImpl {
try {
// This node which acts as coordinator will transfer complete historical
snapshot to new
// target.
- createConsensusPipeToTargetPeer(targetPeer, thisNode, false);
+ createConsensusPipeToTargetPeer(targetPeer, false);
} catch (Exception e) {
LOGGER.warn(
"{} cannot create consensus pipe to {}, may because target peer is
unknown currently, please manually check!",
@@ -443,12 +444,10 @@ public class PipeConsensusServerImpl {
}
public synchronized void createConsensusPipeToTargetPeer(
- Peer targetPeer, Peer regionMigrationCoordinatorPeer, boolean
needManuallyStart)
- throws ConsensusGroupModifyPeerException {
+ Peer targetPeer, boolean needManuallyStart) throws
ConsensusGroupModifyPeerException {
try {
KillPoint.setKillPoint(DataNodeKillPoints.ORIGINAL_ADD_PEER_DONE);
- consensusPipeManager.createConsensusPipe(
- thisNode, targetPeer, regionMigrationCoordinatorPeer,
needManuallyStart);
+ consensusPipeManager.createConsensusPipe(thisNode, targetPeer,
needManuallyStart);
peerManager.addAndPersist(targetPeer);
} catch (IOException e) {
LOGGER.warn("{} cannot persist peer {}", thisNode, targetPeer, e);
@@ -535,22 +534,35 @@ public class PipeConsensusServerImpl {
}
}
+ /** Wait for the user written data up to firstCheck to be replicated */
public void waitPeersToTargetPeerTransmissionCompleted(Peer targetPeer)
throws ConsensusGroupModifyPeerException {
boolean isTransmissionCompleted = false;
- boolean isFirstCheck = true;
+ boolean isFirstCheckForCurrentPeer = true;
+ boolean isFirstCheckForOtherPeers = true;
try {
while (!isTransmissionCompleted) {
Thread.sleep(CHECK_TRANSMISSION_COMPLETION_INTERVAL_IN_MILLISECONDS);
- // Only wait coordinator to transfer snapshot instead of waiting all
peers completing data
- // transfer. Keep consistent with IoTV1.
- isTransmissionCompleted =
- isConsensusPipesTransmissionCompleted(
- Collections.singletonList(new ConsensusPipeName(thisNode,
targetPeer).toString()),
- isFirstCheck);
- isFirstCheck = false;
+ if (isConsensusPipesTransmissionCompleted(
+ Collections.singletonList(new ConsensusPipeName(thisNode,
targetPeer).toString()),
+ isFirstCheckForCurrentPeer)) {
+ final List<Peer> otherPeers = peerManager.getOtherPeers(thisNode);
+
+ isTransmissionCompleted = true;
+ for (Peer peer : otherPeers) {
+ if (!peer.equals(targetPeer)) {
+ isTransmissionCompleted &=
+ isRemotePeerConsensusPipesTransmissionCompleted(
+ peer,
+ Collections.singletonList(new ConsensusPipeName(peer,
targetPeer).toString()),
+ isFirstCheckForOtherPeers);
+ }
+ }
+ isFirstCheckForOtherPeers = false;
+ }
+ isFirstCheckForCurrentPeer = false;
}
} catch (InterruptedException e) {
LOGGER.warn("{} is interrupted when waiting for transfer completed",
thisNode, e);
@@ -560,6 +572,7 @@ public class PipeConsensusServerImpl {
}
}
+ /** Wait for the user written data up to firstCheck to be replicated */
public void waitTargetPeerToPeersTransmissionCompleted(Peer targetPeer)
throws ConsensusGroupModifyPeerException {
boolean isTransmissionCompleted = false;
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeManager.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeManager.java
index f5de22508ba..1fc2abe03bf 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeManager.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeManager.java
@@ -41,7 +41,6 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CAPTURE_TREE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CONSENSUS_GROUP_ID_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CONSENSUS_RECEIVER_DATANODE_ID_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CONSENSUS_RESTORE_PROGRESS_PIPE_TASK_NAME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CONSENSUS_SENDER_DATANODE_ID_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_INCLUSION_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_KEY;
@@ -69,7 +68,7 @@ public class ConsensusPipeManager {
// The third parameter is only used when region migration. Since this
method is not called by
// region migration, just pass senderPeer in to get the correct result.
Triple<ImmutableMap<String, String>, ImmutableMap<String, String>,
ImmutableMap<String, String>>
- params = buildPipeParams(senderPeer, receiverPeer, senderPeer);
+ params = buildPipeParams(senderPeer, receiverPeer);
dispatcher.createPipe(
consensusPipeName.toString(),
params.getLeft(),
@@ -79,15 +78,11 @@ public class ConsensusPipeManager {
}
/** This method is used when executing region migration */
- public void createConsensusPipe(
- Peer senderPeer,
- Peer receiverPeer,
- Peer regionMigrationCoordinatorPeer,
- boolean needManuallyStart)
+ public void createConsensusPipe(Peer senderPeer, Peer receiverPeer, boolean
needManuallyStart)
throws Exception {
ConsensusPipeName consensusPipeName = new ConsensusPipeName(senderPeer,
receiverPeer);
Triple<ImmutableMap<String, String>, ImmutableMap<String, String>,
ImmutableMap<String, String>>
- params = buildPipeParams(senderPeer, receiverPeer,
regionMigrationCoordinatorPeer);
+ params = buildPipeParams(senderPeer, receiverPeer);
dispatcher.createPipe(
consensusPipeName.toString(),
params.getLeft(),
@@ -98,52 +93,25 @@ public class ConsensusPipeManager {
public Triple<
ImmutableMap<String, String>, ImmutableMap<String, String>,
ImmutableMap<String, String>>
- buildPipeParams(Peer senderPeer, Peer receiverPeer, Peer
regionMigrationCoordinatorPeer) {
+ buildPipeParams(Peer senderPeer, Peer receiverPeer) {
ConsensusPipeName consensusPipeName = new ConsensusPipeName(senderPeer,
receiverPeer);
-
- ImmutableMap<String, String> extractorParams;
- if (senderPeer.equals(regionMigrationCoordinatorPeer)) {
- extractorParams =
- ImmutableMap.<String, String>builder()
- .put(EXTRACTOR_KEY, config.getExtractorPluginName())
- .put(EXTRACTOR_INCLUSION_KEY,
CONSENSUS_EXTRACTOR_INCLUSION_VALUE)
- .put(
- EXTRACTOR_CONSENSUS_GROUP_ID_KEY,
- consensusPipeName.getConsensusGroupId().toString())
- .put(
- EXTRACTOR_CONSENSUS_SENDER_DATANODE_ID_KEY,
- String.valueOf(consensusPipeName.getSenderDataNodeId()))
- .put(
- EXTRACTOR_CONSENSUS_RECEIVER_DATANODE_ID_KEY,
- String.valueOf(consensusPipeName.getReceiverDataNodeId()))
- .put(EXTRACTOR_REALTIME_MODE_KEY, replicateMode.getValue())
- .put(EXTRACTOR_CAPTURE_TABLE_KEY, String.valueOf(true))
- .put(EXTRACTOR_CAPTURE_TREE_KEY, String.valueOf(true))
- .build();
- } else {
- extractorParams =
- ImmutableMap.<String, String>builder()
- .put(EXTRACTOR_KEY, config.getExtractorPluginName())
- .put(EXTRACTOR_INCLUSION_KEY,
CONSENSUS_EXTRACTOR_INCLUSION_VALUE)
- .put(
- EXTRACTOR_CONSENSUS_GROUP_ID_KEY,
- consensusPipeName.getConsensusGroupId().toString())
- .put(
- EXTRACTOR_CONSENSUS_SENDER_DATANODE_ID_KEY,
- String.valueOf(consensusPipeName.getSenderDataNodeId()))
- .put(
- EXTRACTOR_CONSENSUS_RECEIVER_DATANODE_ID_KEY,
- String.valueOf(consensusPipeName.getReceiverDataNodeId()))
- .put(EXTRACTOR_REALTIME_MODE_KEY, replicateMode.getValue())
- .put(EXTRACTOR_CAPTURE_TABLE_KEY, String.valueOf(true))
- .put(EXTRACTOR_CAPTURE_TREE_KEY, String.valueOf(true))
- .put(
- EXTRACTOR_CONSENSUS_RESTORE_PROGRESS_PIPE_TASK_NAME_KEY,
- String.valueOf(new ConsensusPipeName(senderPeer,
regionMigrationCoordinatorPeer)))
- .build();
- }
return new ImmutableTriple<>(
- extractorParams,
+ ImmutableMap.<String, String>builder()
+ .put(EXTRACTOR_KEY, config.getExtractorPluginName())
+ .put(EXTRACTOR_INCLUSION_KEY, CONSENSUS_EXTRACTOR_INCLUSION_VALUE)
+ .put(
+ EXTRACTOR_CONSENSUS_GROUP_ID_KEY,
+ consensusPipeName.getConsensusGroupId().toString())
+ .put(
+ EXTRACTOR_CONSENSUS_SENDER_DATANODE_ID_KEY,
+ String.valueOf(consensusPipeName.getSenderDataNodeId()))
+ .put(
+ EXTRACTOR_CONSENSUS_RECEIVER_DATANODE_ID_KEY,
+ String.valueOf(consensusPipeName.getReceiverDataNodeId()))
+ .put(EXTRACTOR_REALTIME_MODE_KEY, replicateMode.getValue())
+ .put(EXTRACTOR_CAPTURE_TABLE_KEY, String.valueOf(true))
+ .put(EXTRACTOR_CAPTURE_TREE_KEY, String.valueOf(true))
+ .build(),
ImmutableMap.<String, String>builder()
.put(PROCESSOR_KEY, config.getProcessorPluginName())
.build(),
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCServiceProcessor.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCServiceProcessor.java
index 6d75ddfa680..3aa69af6ff5 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCServiceProcessor.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCServiceProcessor.java
@@ -126,11 +126,7 @@ public class PipeConsensusRPCServiceProcessor implements
PipeConsensusIService.I
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.targetPeerConsensusGroupId),
req.targetPeerNodeId,
req.targetPeerEndPoint),
- new Peer(
-
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.targetPeerConsensusGroupId),
- req.coordinatorPeerNodeId,
- req.coordinatorPeerEndPoint),
- true);
+ false);
responseStatus = new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (ConsensusGroupModifyPeerException e) {
responseStatus = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
index ae08fa8b0cb..011bfcc389a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
@@ -31,8 +31,6 @@ import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeE
import org.apache.iotdb.commons.pipe.datastructure.PersistentResource;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
-import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeName;
-import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResource;
import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResourceManager;
import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
@@ -73,7 +71,6 @@ import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CONSENSUS_RESTORE_PROGRESS_PIPE_TASK_NAME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_END_TIME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY;
@@ -308,20 +305,7 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionExtractor
pipeName = environment.getPipeName();
creationTime = environment.getCreationTime();
pipeTaskMeta = environment.getPipeTaskMeta();
- if
(parameters.hasAnyAttributes(EXTRACTOR_CONSENSUS_RESTORE_PROGRESS_PIPE_TASK_NAME_KEY))
{
- ConsensusPipeName currentNode2CoordinatorPipeName =
- new ConsensusPipeName(
-
parameters.getString(EXTRACTOR_CONSENSUS_RESTORE_PROGRESS_PIPE_TASK_NAME_KEY));
- // For region migration in IoTV2, non-coordinators will only transfer
data after
- // `ProgressIndex(non-coordinators2coordinator)`
- startIndex =
- PipeDataNodeAgent.task()
- .getPipeTaskProgressIndex(
- currentNode2CoordinatorPipeName.toString(),
-
currentNode2CoordinatorPipeName.getConsensusGroupId().getId());
- } else {
- startIndex = environment.getPipeTaskMeta().getProgressIndex();
- }
+ startIndex = environment.getPipeTaskMeta().getProgressIndex();
dataRegionId = environment.getRegionId();
synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) {
@@ -750,7 +734,7 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionExtractor
"Pipe {}@{}: finish to extract deletions, extract deletions count
{}/{}, took {} ms",
pipeName,
dataRegionId,
- resourceList.size(),
+ allDeletionResources.size(),
originalDeletionCount,
System.currentTimeMillis() - startTime);
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
index bc57308f1e8..7522cd444fe 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
@@ -135,8 +135,6 @@ public class PipeExtractorConstant {
"extractor.consensus.sender-dn-id";
public static final String EXTRACTOR_CONSENSUS_RECEIVER_DATANODE_ID_KEY =
"extractor.consensus.receiver-dn-id";
- public static final String
EXTRACTOR_CONSENSUS_RESTORE_PROGRESS_PIPE_TASK_NAME_KEY =
- "extractor.consensus.restore-progress-pipe-task-name";
private PipeExtractorConstant() {
throw new IllegalStateException("Utility class");
diff --git
a/iotdb-protocol/thrift-consensus/src/main/thrift/pipeconsensus.thrift
b/iotdb-protocol/thrift-consensus/src/main/thrift/pipeconsensus.thrift
index 3d3bec771db..ca7a043c007 100644
--- a/iotdb-protocol/thrift-consensus/src/main/thrift/pipeconsensus.thrift
+++ b/iotdb-protocol/thrift-consensus/src/main/thrift/pipeconsensus.thrift
@@ -63,8 +63,6 @@ struct TNotifyPeerToCreateConsensusPipeReq {
1: required common.TConsensusGroupId targetPeerConsensusGroupId
2: required common.TEndPoint targetPeerEndPoint
3: required i32 targetPeerNodeId
- 4: required common.TEndPoint coordinatorPeerEndPoint
- 5: required i32 coordinatorPeerNodeId
}
struct TNotifyPeerToCreateConsensusPipeResp {