This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 35ef0c8ba42 Fix deadlock between DataNode createDataRegion and
ConfigNode PipeTaskCoordinatorLock by delegating consensus pipe lifecycle to
ConfigNode (#17233)
35ef0c8ba42 is described below
commit 35ef0c8ba4221d5fcba5f658e54f617f40001e28
Author: Peng Junzhi <[email protected]>
AuthorDate: Wed Mar 4 14:19:18 2026 +0800
Fix deadlock between DataNode createDataRegion and ConfigNode
PipeTaskCoordinatorLock by delegating consensus pipe lifecycle to ConfigNode
(#17233)
* Fix deadlock between DataNode createDataRegion and ConfigNode
PipeTaskCoordinatorLock by delegating consensus pipe lifecycle to ConfigNode
* CN get IoTV2 replication mode
* remove duplicated code
* spotless
* fix not create consensus pipe when initially create data region
* Async create pipe to avoid audit log dead lock
* fix PipeTaskCoordinator can not be shared cross thread
* fix DN CN restart at the same time with data race and DN where region was
removed failed to clear region file
* refine code
* remove useless comment
* unlock for PipeTaskCoordinator no longer return
---
.../iotdb/confignode/conf/ConfigNodeConfig.java | 11 +
.../confignode/conf/ConfigNodeDescriptor.java | 3 +
.../iotdb/confignode/manager/ProcedureManager.java | 32 +++
.../pipe/coordinator/task/PipeTaskCoordinator.java | 14 +-
.../coordinator/task/PipeTaskCoordinatorLock.java | 19 +-
.../subscription/SubscriptionCoordinator.java | 10 +-
.../procedure/env/RegionMaintainHandler.java | 245 +++++++++++++++++++--
.../impl/region/AddRegionPeerProcedure.java | 5 +
.../impl/region/CreateRegionGroupsProcedure.java | 6 +
.../impl/region/RemoveRegionPeerProcedure.java | 45 +++-
.../procedure/state/AddRegionPeerState.java | 1 +
.../procedure/state/CreateRegionGroupsState.java | 5 +-
.../procedure/state/RemoveRegionPeerState.java | 1 +
.../consensus/config/PipeConsensusConfig.java | 16 --
.../apache/iotdb/consensus/pipe/PipeConsensus.java | 39 ++--
.../consensus/pipe/PipeConsensusServerImpl.java | 223 +++----------------
.../consensuspipe/ConsensusPipeDispatcher.java | 42 ----
.../pipe/consensuspipe/ConsensusPipeManager.java | 157 -------------
.../service/PipeConsensusRPCServiceProcessor.java | 3 +-
.../db/consensus/DataRegionConsensusImpl.java | 2 -
.../consensus/ConsensusPipeDataNodeDispatcher.java | 131 -----------
.../java/org/apache/iotdb/db/service/DataNode.java | 61 ++---
22 files changed, 426 insertions(+), 645 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 88e8d76001d..3abb322d084 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -77,6 +77,9 @@ public class ConfigNodeConfig {
/** Data region consensus protocol. */
private String dataRegionConsensusProtocolClass =
ConsensusFactory.IOT_CONSENSUS;
+ /** IoTConsensusV2 replicate mode: "batch" or "stream". */
+ private String iotConsensusV2Mode = "batch";
+
/** Default number of DataRegion replicas. */
private int dataReplicationFactor = 1;
@@ -530,6 +533,14 @@ public class ConfigNodeConfig {
this.dataRegionConsensusProtocolClass = dataRegionConsensusProtocolClass;
}
+ public String getIotConsensusV2Mode() {
+ return iotConsensusV2Mode;
+ }
+
+ public void setIotConsensusV2Mode(String iotConsensusV2Mode) {
+ this.iotConsensusV2Mode = iotConsensusV2Mode;
+ }
+
public int getDataRegionPerDataNode() {
return dataRegionPerDataNode;
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 0ea7a278732..77790dae1a9 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -205,6 +205,9 @@ public class ConfigNodeDescriptor {
properties.getProperty(
"data_region_consensus_protocol_class",
conf.getDataRegionConsensusProtocolClass()));
+ conf.setIotConsensusV2Mode(
+ properties.getProperty("iot_consensus_v2_mode",
conf.getIotConsensusV2Mode()));
+
conf.setDataReplicationFactor(
Integer.parseInt(
properties.getProperty(
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 0fe3abc79a7..646aaf66daf 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -1485,6 +1485,23 @@ public class ProcedureManager {
}
}
+ /**
+ * Submit a consensus pipe creation procedure without blocking. The
procedure will execute in the
+ * background. Failures are logged and can be repaired by the consensus pipe
guardian.
+ */
+ public void createConsensusPipeAsync(TCreatePipeReq req) {
+ try {
+ CreatePipeProcedureV2 procedure = new CreatePipeProcedureV2(req);
+ executor.submitProcedure(procedure);
+ LOGGER.info("Submitted async consensus pipe creation: {}",
req.getPipeName());
+ } catch (Exception e) {
+ LOGGER.warn(
+ "Failed to submit async consensus pipe creation for {}: {}",
+ req.getPipeName(),
+ e.getMessage());
+ }
+ }
+
public TSStatus createPipe(TCreatePipeReq req) {
try {
CreatePipeProcedureV2 procedure = new CreatePipeProcedureV2(req);
@@ -1579,6 +1596,21 @@ public class ProcedureManager {
}
}
+ /**
+ * Submit a consensus pipe drop procedure without blocking. The procedure
will execute in the
+ * background. Failures are logged and can be repaired by the consensus pipe
guardian.
+ */
+ public void dropConsensusPipeAsync(String pipeName) {
+ try {
+ DropPipeProcedureV2 procedure = new DropPipeProcedureV2(pipeName);
+ executor.submitProcedure(procedure);
+ LOGGER.info("Submitted async consensus pipe drop: {}", pipeName);
+ } catch (Exception e) {
+ LOGGER.warn(
+ "Failed to submit async consensus pipe drop for {}: {}", pipeName,
e.getMessage());
+ }
+ }
+
public TSStatus dropPipe(String pipeName) {
try {
DropPipeProcedureV2 procedure = new DropPipeProcedureV2(pipeName);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
index c0b81c2e43c..ea9c61cf45e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
@@ -85,19 +85,9 @@ public class PipeTaskCoordinator {
/**
* Unlock the pipe task coordinator. Calling this method will clear the pipe
task info holder,
* which means that the holder will be null after calling this method.
- *
- * @return {@code true} if successfully unlocked, {@code false} if current
thread is not holding
- * the lock.
*/
- public boolean unlock() {
- try {
- pipeTaskCoordinatorLock.unlock();
- return true;
- } catch (IllegalMonitorStateException ignored) {
- // This is thrown if unlock() is called without lock() called first.
- LOGGER.warn("This thread is not holding the lock.");
- return false;
- }
+ public void unlock() {
+ pipeTaskCoordinatorLock.unlock();
}
public boolean isLocked() {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
index 12b92619004..b86c556f20d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
@@ -22,24 +22,29 @@ package
org.apache.iotdb.confignode.manager.pipe.coordinator.task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
/**
- * {@link PipeTaskCoordinatorLock} is a cross thread lock for pipe task
coordinator. It is used to
+ * {@link PipeTaskCoordinatorLock} is a cross-thread lock for pipe task
coordinator. It is used to
* ensure that only one thread can execute the pipe task coordinator at the
same time.
+ *
+ * <p>Uses {@link Semaphore} instead of {@link
java.util.concurrent.locks.ReentrantLock} to support
+ * cross-thread acquire/release, which is required by the procedure recovery
mechanism: locks may be
+ * acquired on the StateMachineUpdater thread during {@code restoreLock()} and
released on a
+ * ProcedureCoreWorker thread after execution.
*/
public class PipeTaskCoordinatorLock {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTaskCoordinatorLock.class);
- private final ReentrantLock lock = new ReentrantLock();
+ private final Semaphore semaphore = new Semaphore(1);
public void lock() {
LOGGER.debug(
"PipeTaskCoordinator lock waiting for thread {}",
Thread.currentThread().getName());
try {
- lock.lockInterruptibly();
+ semaphore.acquire();
LOGGER.debug(
"PipeTaskCoordinator lock acquired by thread {}",
Thread.currentThread().getName());
} catch (final InterruptedException e) {
@@ -54,7 +59,7 @@ public class PipeTaskCoordinatorLock {
try {
LOGGER.debug(
"PipeTaskCoordinator lock waiting for thread {}",
Thread.currentThread().getName());
- if (lock.tryLock(10, TimeUnit.SECONDS)) {
+ if (semaphore.tryAcquire(10, TimeUnit.SECONDS)) {
LOGGER.debug(
"PipeTaskCoordinator lock acquired by thread {}",
Thread.currentThread().getName());
return true;
@@ -74,12 +79,12 @@ public class PipeTaskCoordinatorLock {
}
public void unlock() {
- lock.unlock();
+ semaphore.release();
LOGGER.debug(
"PipeTaskCoordinator lock released by thread {}",
Thread.currentThread().getName());
}
public boolean isLocked() {
- return lock.isLocked();
+ return semaphore.availablePermits() == 0;
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
index b52f958d30a..038167ae58c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
@@ -105,14 +105,8 @@ public class SubscriptionCoordinator {
subscriptionInfoHolder = null;
}
- try {
- coordinatorLock.unlock();
- return true;
- } catch (IllegalMonitorStateException ignored) {
- // This is thrown if unlock() is called without lock() called first.
- LOGGER.warn("This thread is not holding the lock.");
- return false;
- }
+ coordinatorLock.unlock();
+ return true;
}
public boolean isLocked() {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
index 928f3cfcd37..f827adea5d4 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
@@ -32,6 +32,9 @@ import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
import
org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager;
@@ -42,9 +45,12 @@ import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import
org.apache.iotdb.confignode.consensus.request.write.partition.AddRegionLocationPlan;
import
org.apache.iotdb.confignode.consensus.request.write.partition.RemoveRegionLocationPlan;
+import
org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
import org.apache.iotdb.confignode.manager.ConfigManager;
import
org.apache.iotdb.confignode.manager.load.cache.consensus.ConsensusGroupHeartbeatSample;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeName;
import org.apache.iotdb.mpp.rpc.thrift.TCreatePeerReq;
import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeResp;
@@ -64,6 +70,23 @@ import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_CONSENSUS_GROUP_ID_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_CONSENSUS_PIPE_NAME;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_IP_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_PORT_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_CAPTURE_TABLE_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_CAPTURE_TREE_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_CONSENSUS_GROUP_ID_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_CONSENSUS_RECEIVER_DATANODE_ID_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_CONSENSUS_SENDER_DATANODE_ID_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_INCLUSION_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_IOTDB_USER_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_REALTIME_MODE_KEY;
import static
org.apache.iotdb.confignode.conf.ConfigNodeConstant.REGION_MIGRATE_PROCESS;
import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS;
import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS_V2;
@@ -275,22 +298,15 @@ public class RegionMaintainHandler {
TMaintainPeerReq maintainPeerReq =
new TMaintainPeerReq(regionId, originalDataNode, procedureId);
+ // Always use full retries regardless of node status, because after a
cluster crash the
+ // target DataNode may be Unknown but still in the process of restarting.
status =
-
configManager.getLoadManager().getNodeStatus(originalDataNode.getDataNodeId())
- == NodeStatus.Unknown
- ? (TSStatus)
- SyncDataNodeClientPool.getInstance()
- .sendSyncRequestToDataNodeWithGivenRetry(
- originalDataNode.getInternalEndPoint(),
- maintainPeerReq,
- CnToDnSyncRequestType.DELETE_OLD_REGION_PEER,
- 1)
- : (TSStatus)
- SyncDataNodeClientPool.getInstance()
- .sendSyncRequestToDataNodeWithRetry(
- originalDataNode.getInternalEndPoint(),
- maintainPeerReq,
- CnToDnSyncRequestType.DELETE_OLD_REGION_PEER);
+ (TSStatus)
+ SyncDataNodeClientPool.getInstance()
+ .sendSyncRequestToDataNodeWithRetry(
+ originalDataNode.getInternalEndPoint(),
+ maintainPeerReq,
+ CnToDnSyncRequestType.DELETE_OLD_REGION_PEER);
LOGGER.info(
"{}, Send action deleteOldRegionPeer finished, regionId: {},
dataNodeId: {}",
REGION_MIGRATE_PROCESS,
@@ -392,6 +408,205 @@ public class RegionMaintainHandler {
configManager.getLoadManager().getRouteBalancer().balanceRegionLeaderAndPriority();
}
+ /**
+ * Create bidirectional consensus pipes between the target DataNode and all
existing peers. Only
+ * applies to IoTConsensusV2 DataRegions. Called by AddRegionPeerProcedure
before
+ * DO_ADD_REGION_PEER so that pipes exist before the coordinator starts data
transfer.
+ */
+ public void createConsensusPipesForAddPeer(
+ TConsensusGroupId regionId, TDataNodeLocation targetDataNode) {
+ if (!isIoTConsensusV2DataRegion(regionId)) {
+ return;
+ }
+
+ List<TDataNodeLocation> existingLocations = findRegionLocations(regionId);
+ for (TDataNodeLocation existingLocation : existingLocations) {
+ if (existingLocation.getDataNodeId() == targetDataNode.getDataNodeId()) {
+ continue;
+ }
+ // Pipe: existingPeer → targetPeer
+ createSingleConsensusPipe(
+ regionId,
+ existingLocation.getDataNodeId(),
+ existingLocation.getDataRegionConsensusEndPoint(),
+ targetDataNode.getDataNodeId(),
+ targetDataNode.getDataRegionConsensusEndPoint());
+ // Pipe: targetPeer → existingPeer
+ createSingleConsensusPipe(
+ regionId,
+ targetDataNode.getDataNodeId(),
+ targetDataNode.getDataRegionConsensusEndPoint(),
+ existingLocation.getDataNodeId(),
+ existingLocation.getDataRegionConsensusEndPoint());
+ }
+ }
+
+ /**
+ * Create bidirectional consensus pipes among all peers for newly created
RegionGroups. Only
+ * applies to IoTConsensusV2 DataRegions. Called by
CreateRegionGroupsProcedure after all regions
+ * are activated.
+ */
+ public void createInitialConsensusPipes(CreateRegionGroupsPlan persistPlan) {
+ if (!IOT_CONSENSUS_V2.equals(CONF.getDataRegionConsensusProtocolClass())) {
+ return;
+ }
+
+ persistPlan
+ .getRegionGroupMap()
+ .forEach(
+ (database, regionReplicaSets) ->
+ regionReplicaSets.forEach(
+ regionReplicaSet -> {
+ TConsensusGroupId regionId =
regionReplicaSet.getRegionId();
+ if
(!TConsensusGroupType.DataRegion.equals(regionId.getType())) {
+ return;
+ }
+ List<TDataNodeLocation> locations =
regionReplicaSet.getDataNodeLocations();
+ for (int i = 0; i < locations.size(); i++) {
+ for (int j = 0; j < locations.size(); j++) {
+ if (i == j) {
+ continue;
+ }
+ createSingleConsensusPipeAsync(
+ regionId,
+ locations.get(i).getDataNodeId(),
+
locations.get(i).getDataRegionConsensusEndPoint(),
+ locations.get(j).getDataNodeId(),
+
locations.get(j).getDataRegionConsensusEndPoint());
+ }
+ }
+ }));
+ }
+
+ /**
+ * Drop consensus pipes related to the target DataNode for a region. Only
applies to
+ * IoTConsensusV2 DataRegions. Called by RemoveRegionPeerProcedure after
DELETE_OLD_REGION_PEER.
+ */
+ public void dropConsensusPipesForRemovePeer(
+ TConsensusGroupId regionId, TDataNodeLocation targetDataNode) {
+ if (!isIoTConsensusV2DataRegion(regionId)) {
+ return;
+ }
+
+ DataRegionId dataRegionId = new DataRegionId(regionId.getId());
+ List<TDataNodeLocation> existingLocations = findRegionLocations(regionId);
+ for (TDataNodeLocation existingLocation : existingLocations) {
+ if (existingLocation.getDataNodeId() == targetDataNode.getDataNodeId()) {
+ continue;
+ }
+ // Drop pipe: existingPeer → targetPeer
+ String pipeName1 =
+ new ConsensusPipeName(
+ dataRegionId, existingLocation.getDataNodeId(),
targetDataNode.getDataNodeId())
+ .toString();
+ configManager.getProcedureManager().dropConsensusPipeAsync(pipeName1);
+ // Drop pipe: targetPeer → existingPeer
+ String pipeName2 =
+ new ConsensusPipeName(
+ dataRegionId, targetDataNode.getDataNodeId(),
existingLocation.getDataNodeId())
+ .toString();
+ configManager.getProcedureManager().dropConsensusPipeAsync(pipeName2);
+ }
+ }
+
+ private boolean isIoTConsensusV2DataRegion(TConsensusGroupId regionId) {
+ return TConsensusGroupType.DataRegion.equals(regionId.getType())
+ && IOT_CONSENSUS_V2.equals(CONF.getDataRegionConsensusProtocolClass());
+ }
+
+ private TCreatePipeReq buildConsensusPipeReq(
+ TConsensusGroupId regionId,
+ int senderNodeId,
+ TEndPoint senderEndpoint,
+ int receiverNodeId,
+ TEndPoint receiverEndpoint) {
+ DataRegionId dataRegionId = new DataRegionId(regionId.getId());
+ ConsensusPipeName pipeName = new ConsensusPipeName(dataRegionId,
senderNodeId, receiverNodeId);
+
+ String replicateMode = CONF.getIotConsensusV2Mode();
+
+ Map<String, String> extractorAttributes = new HashMap<>();
+ extractorAttributes.put(EXTRACTOR_KEY,
BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName());
+ extractorAttributes.put(EXTRACTOR_INCLUSION_KEY, "data");
+ extractorAttributes.put(EXTRACTOR_CONSENSUS_GROUP_ID_KEY,
dataRegionId.toString());
+ extractorAttributes.put(
+ EXTRACTOR_CONSENSUS_SENDER_DATANODE_ID_KEY,
String.valueOf(senderNodeId));
+ extractorAttributes.put(
+ EXTRACTOR_CONSENSUS_RECEIVER_DATANODE_ID_KEY,
String.valueOf(receiverNodeId));
+ extractorAttributes.put(EXTRACTOR_REALTIME_MODE_KEY, replicateMode);
+ extractorAttributes.put(EXTRACTOR_CAPTURE_TABLE_KEY, String.valueOf(true));
+ extractorAttributes.put(EXTRACTOR_CAPTURE_TREE_KEY, String.valueOf(true));
+ extractorAttributes.put(
+ EXTRACTOR_IOTDB_USER_KEY,
CommonDescriptor.getInstance().getConfig().getDefaultAdminName());
+
+ Map<String, String> processorAttributes = new HashMap<>();
+ processorAttributes.put(
+ PROCESSOR_KEY,
BuiltinPipePlugin.PIPE_CONSENSUS_PROCESSOR.getPipePluginName());
+
+ Map<String, String> connectorAttributes = new HashMap<>();
+ connectorAttributes.put(
+ CONNECTOR_KEY,
BuiltinPipePlugin.PIPE_CONSENSUS_ASYNC_CONNECTOR.getPipePluginName());
+ connectorAttributes.put(CONNECTOR_CONSENSUS_GROUP_ID_KEY,
String.valueOf(dataRegionId.getId()));
+ connectorAttributes.put(CONNECTOR_CONSENSUS_PIPE_NAME,
pipeName.toString());
+ connectorAttributes.put(CONNECTOR_IOTDB_IP_KEY, receiverEndpoint.ip);
+ connectorAttributes.put(CONNECTOR_IOTDB_PORT_KEY,
String.valueOf(receiverEndpoint.port));
+ connectorAttributes.put(CONNECTOR_IOTDB_PARALLEL_TASKS_KEY,
String.valueOf(1));
+ connectorAttributes.put(CONNECTOR_REALTIME_FIRST_KEY,
String.valueOf(false));
+
+ return new TCreatePipeReq()
+ .setPipeName(pipeName.toString())
+ .setNeedManuallyStart(false)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes)
+ .setConnectorAttributes(connectorAttributes);
+ }
+
+ /**
+ * Create a single consensus pipe synchronously (blocks until procedure
finishes or times out with
+ * optimistic success). Used by AddRegionPeerProcedure where pipes must
exist before data sync.
+ */
+ private void createSingleConsensusPipe(
+ TConsensusGroupId regionId,
+ int senderNodeId,
+ TEndPoint senderEndpoint,
+ int receiverNodeId,
+ TEndPoint receiverEndpoint) {
+ TCreatePipeReq req =
+ buildConsensusPipeReq(
+ regionId, senderNodeId, senderEndpoint, receiverNodeId,
receiverEndpoint);
+ TSStatus status =
configManager.getProcedureManager().createConsensusPipe(req);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ LOGGER.warn(
+ "{}, Failed to create consensus pipe {}: {}",
+ REGION_MIGRATE_PROCESS,
+ req.getPipeName(),
+ status);
+ } else {
+ LOGGER.info("{}, Created consensus pipe {}", REGION_MIGRATE_PROCESS,
req.getPipeName());
+ }
+ }
+
+ /**
+ * Create a single consensus pipe asynchronously (fire-and-forget). Used by
+ * CreateRegionGroupsProcedure where blocking would cause deadlock and new
regions have no data to
+ * lose.
+ */
+ private void createSingleConsensusPipeAsync(
+ TConsensusGroupId regionId,
+ int senderNodeId,
+ TEndPoint senderEndpoint,
+ int receiverNodeId,
+ TEndPoint receiverEndpoint) {
+ TCreatePipeReq req =
+ buildConsensusPipeReq(
+ regionId, senderNodeId, senderEndpoint, receiverNodeId,
receiverEndpoint);
+ configManager.getProcedureManager().createConsensusPipeAsync(req);
+ LOGGER.info(
+ "{}, Submitted async consensus pipe creation: {}",
+ REGION_MIGRATE_PROCESS,
+ req.getPipeName());
+ }
+
/**
* Find all DataNodes which contains the given regionId
*
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
index f1907edf754..d9cd2ad8817 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
@@ -94,6 +94,11 @@ public class AddRegionPeerProcedure extends
RegionOperationProcedure<AddRegionPe
if (status.getCode() != SUCCESS_STATUS.getStatusCode()) {
return warnAndRollBackAndNoMoreState(env, handler,
"CREATE_NEW_REGION_PEER fail");
}
+ setNextState(AddRegionPeerState.CREATE_CONSENSUS_PIPES);
+ break;
+ case CREATE_CONSENSUS_PIPES:
+ handler.createConsensusPipesForAddPeer(regionId, targetDataNode);
+ setKillPoint(state);
setNextState(AddRegionPeerState.DO_ADD_REGION_PEER);
break;
case DO_ADD_REGION_PEER:
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java
index 14e4be60ed9..17c8b2abdf4 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java
@@ -225,6 +225,12 @@ public class CreateRegionGroupsProcedure
}
}));
env.activateRegionGroup(activateRegionGroupMap);
+ setNextState(CreateRegionGroupsState.CREATE_INITIAL_CONSENSUS_PIPES);
+ break;
+ case CREATE_INITIAL_CONSENSUS_PIPES:
+ if (TConsensusGroupType.DataRegion.equals(consensusGroupType)) {
+
env.getRegionMaintainHandler().createInitialConsensusPipes(persistPlan);
+ }
setNextState(CreateRegionGroupsState.CREATE_REGION_GROUPS_FINISH);
break;
case CREATE_REGION_GROUPS_FINISH:
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
index f362a7a1008..9b4c29b095e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
@@ -45,14 +45,18 @@ import java.util.Objects;
import static org.apache.iotdb.commons.utils.KillPoint.KillPoint.setKillPoint;
import static
org.apache.iotdb.confignode.procedure.state.RemoveRegionPeerState.DELETE_OLD_REGION_PEER;
+import static
org.apache.iotdb.confignode.procedure.state.RemoveRegionPeerState.DROP_CONSENSUS_PIPES;
import static
org.apache.iotdb.confignode.procedure.state.RemoveRegionPeerState.REMOVE_REGION_LOCATION_CACHE;
import static
org.apache.iotdb.confignode.procedure.state.RemoveRegionPeerState.REMOVE_REGION_PEER;
import static org.apache.iotdb.rpc.TSStatusCode.SUCCESS_STATUS;
public class RemoveRegionPeerProcedure extends
RegionOperationProcedure<RemoveRegionPeerState> {
private static final Logger LOGGER =
LoggerFactory.getLogger(RemoveRegionPeerProcedure.class);
+ private static final int MAX_DELETE_OLD_REGION_PEER_RETRY = 3;
+ private static final long DELETE_OLD_REGION_PEER_RETRY_INTERVAL_MS = 5_000;
private TDataNodeLocation coordinator;
private TDataNodeLocation targetDataNode;
+ private transient int deleteOldRegionPeerAttempted = 0;
public RemoveRegionPeerProcedure() {
super();
@@ -129,23 +133,56 @@ public class RemoveRegionPeerProcedure extends
RegionOperationProcedure<RemoveRe
handler.submitDeleteOldRegionPeerTask(this.getProcId(),
targetDataNode, regionId);
setKillPoint(state);
if (tsStatus.getCode() != SUCCESS_STATUS.getStatusCode()) {
+ deleteOldRegionPeerAttempted++;
+ if (deleteOldRegionPeerAttempted <=
MAX_DELETE_OLD_REGION_PEER_RETRY) {
+ LOGGER.warn(
+ "[pid{}][RemoveRegion] DELETE_OLD_REGION_PEER task submitted
failed (attempt {}/{}), will retry after {}ms. {}",
+ getProcId(),
+ deleteOldRegionPeerAttempted,
+ MAX_DELETE_OLD_REGION_PEER_RETRY + 1,
+ DELETE_OLD_REGION_PEER_RETRY_INTERVAL_MS,
+ regionId);
+ Thread.sleep(DELETE_OLD_REGION_PEER_RETRY_INTERVAL_MS);
+ setNextState(DELETE_OLD_REGION_PEER);
+ return Flow.HAS_MORE_STATE;
+ }
LOGGER.warn(
- "[pid{}][RemoveRegion] DELETE_OLD_REGION_PEER task submitted
failed, procedure will continue. You should manually delete region file. {}",
+ "[pid{}][RemoveRegion] DELETE_OLD_REGION_PEER task submitted
failed after {} attempts, procedure will continue. You should manually delete
region file. {}",
getProcId(),
+ deleteOldRegionPeerAttempted + 1,
regionId);
- setNextState(REMOVE_REGION_LOCATION_CACHE);
+ setNextState(DROP_CONSENSUS_PIPES);
return Flow.HAS_MORE_STATE;
}
TRegionMigrateResult deleteOldRegionPeerResult =
handler.waitTaskFinish(this.getProcId(), targetDataNode);
if (deleteOldRegionPeerResult.getTaskStatus() !=
TRegionMaintainTaskStatus.SUCCESS) {
+ deleteOldRegionPeerAttempted++;
+ if (deleteOldRegionPeerAttempted <=
MAX_DELETE_OLD_REGION_PEER_RETRY) {
+ LOGGER.warn(
+ "[pid{}][RemoveRegion] DELETE_OLD_REGION_PEER executed
failed (attempt {}/{}), will retry after {}ms. {}",
+ getProcId(),
+ deleteOldRegionPeerAttempted,
+ MAX_DELETE_OLD_REGION_PEER_RETRY + 1,
+ DELETE_OLD_REGION_PEER_RETRY_INTERVAL_MS,
+ regionId);
+ Thread.sleep(DELETE_OLD_REGION_PEER_RETRY_INTERVAL_MS);
+ setNextState(DELETE_OLD_REGION_PEER);
+ return Flow.HAS_MORE_STATE;
+ }
LOGGER.warn(
- "[pid{}][RemoveRegion] DELETE_OLD_REGION_PEER executed failed,
procedure will continue. You should manually delete region file. {}",
+ "[pid{}][RemoveRegion] DELETE_OLD_REGION_PEER executed failed
after {} attempts, procedure will continue. You should manually delete region
file. {}",
getProcId(),
+ deleteOldRegionPeerAttempted + 1,
regionId);
- setNextState(REMOVE_REGION_LOCATION_CACHE);
+ setNextState(DROP_CONSENSUS_PIPES);
return Flow.HAS_MORE_STATE;
}
+ setNextState(DROP_CONSENSUS_PIPES);
+ break;
+ case DROP_CONSENSUS_PIPES:
+ handler.dropConsensusPipesForRemovePeer(regionId, targetDataNode);
+ setKillPoint(state);
setNextState(REMOVE_REGION_LOCATION_CACHE);
break;
case REMOVE_REGION_LOCATION_CACHE:
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/AddRegionPeerState.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/AddRegionPeerState.java
index 0e3477626b6..43fb405c221 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/AddRegionPeerState.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/AddRegionPeerState.java
@@ -23,4 +23,5 @@ public enum AddRegionPeerState {
CREATE_NEW_REGION_PEER,
DO_ADD_REGION_PEER,
UPDATE_REGION_LOCATION_CACHE,
+ CREATE_CONSENSUS_PIPES,
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/CreateRegionGroupsState.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/CreateRegionGroupsState.java
index f8921092667..4ff90132f59 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/CreateRegionGroupsState.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/CreateRegionGroupsState.java
@@ -35,5 +35,8 @@ public enum CreateRegionGroupsState {
// For DataRegionGroups that use iot consensus protocol, select leader by
the way
ACTIVATE_REGION_GROUPS,
- CREATE_REGION_GROUPS_FINISH
+ CREATE_REGION_GROUPS_FINISH,
+
+ // Create initial consensus pipes for IoTConsensusV2 DataRegionGroups.
+ CREATE_INITIAL_CONSENSUS_PIPES
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveRegionPeerState.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveRegionPeerState.java
index e9767972799..a9c463cbafe 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveRegionPeerState.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveRegionPeerState.java
@@ -24,4 +24,5 @@ public enum RemoveRegionPeerState {
REMOVE_REGION_PEER,
DELETE_OLD_REGION_PEER,
REMOVE_REGION_LOCATION_CACHE,
+ DROP_CONSENSUS_PIPES,
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java
index da06c60a624..c0d7257183d 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.consensus.config;
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
-import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeDispatcher;
import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeGuardian;
import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeReceiver;
import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeSelector;
@@ -244,7 +243,6 @@ public class PipeConsensusConfig {
private final String extractorPluginName;
private final String processorPluginName;
private final String connectorPluginName;
- private final ConsensusPipeDispatcher consensusPipeDispatcher;
private final ConsensusPipeGuardian consensusPipeGuardian;
private final ConsensusPipeSelector consensusPipeSelector;
private final ReplicateProgressManager replicateProgressManager;
@@ -255,7 +253,6 @@ public class PipeConsensusConfig {
String extractorPluginName,
String processorPluginName,
String connectorPluginName,
- ConsensusPipeDispatcher consensusPipeDispatcher,
ConsensusPipeGuardian consensusPipeGuardian,
ConsensusPipeSelector consensusPipeSelector,
ReplicateProgressManager replicateProgressManager,
@@ -264,7 +261,6 @@ public class PipeConsensusConfig {
this.extractorPluginName = extractorPluginName;
this.processorPluginName = processorPluginName;
this.connectorPluginName = connectorPluginName;
- this.consensusPipeDispatcher = consensusPipeDispatcher;
this.consensusPipeGuardian = consensusPipeGuardian;
this.consensusPipeSelector = consensusPipeSelector;
this.replicateProgressManager = replicateProgressManager;
@@ -284,10 +280,6 @@ public class PipeConsensusConfig {
return connectorPluginName;
}
- public ConsensusPipeDispatcher getConsensusPipeDispatcher() {
- return consensusPipeDispatcher;
- }
-
public ConsensusPipeGuardian getConsensusPipeGuardian() {
return consensusPipeGuardian;
}
@@ -318,7 +310,6 @@ public class PipeConsensusConfig {
BuiltinPipePlugin.PIPE_CONSENSUS_PROCESSOR.getPipePluginName();
private String connectorPluginName =
BuiltinPipePlugin.PIPE_CONSENSUS_ASYNC_CONNECTOR.getPipePluginName();
- private ConsensusPipeDispatcher consensusPipeDispatcher = null;
private ConsensusPipeGuardian consensusPipeGuardian = null;
private ConsensusPipeSelector consensusPipeSelector = null;
private ReplicateProgressManager replicateProgressManager = null;
@@ -340,12 +331,6 @@ public class PipeConsensusConfig {
return this;
}
- public Pipe.Builder setConsensusPipeDispatcher(
- ConsensusPipeDispatcher consensusPipeDispatcher) {
- this.consensusPipeDispatcher = consensusPipeDispatcher;
- return this;
- }
-
public Pipe.Builder setConsensusPipeGuardian(ConsensusPipeGuardian
consensusPipeGuardian) {
this.consensusPipeGuardian = consensusPipeGuardian;
return this;
@@ -378,7 +363,6 @@ public class PipeConsensusConfig {
extractorPluginName,
processorPluginName,
connectorPluginName,
- consensusPipeDispatcher,
consensusPipeGuardian,
consensusPipeSelector,
replicateProgressManager,
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
index 762f338ad96..33d73d673bf 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
@@ -51,8 +51,8 @@ import
org.apache.iotdb.consensus.exception.IllegalPeerNumException;
import
org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeGuardian;
-import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeManager;
import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeName;
+import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeSelector;
import org.apache.iotdb.consensus.pipe.service.PipeConsensusRPCService;
import
org.apache.iotdb.consensus.pipe.service.PipeConsensusRPCServiceProcessor;
import org.apache.iotdb.rpc.RpcUtils;
@@ -101,7 +101,7 @@ public class PipeConsensus implements IConsensus {
new ConcurrentHashMap<>();
private final ReentrantReadWriteLock stateMachineMapLock = new
ReentrantReadWriteLock();
private final PipeConsensusConfig config;
- private final ConsensusPipeManager consensusPipeManager;
+ private final ConsensusPipeSelector consensusPipeSelector;
private final ConsensusPipeGuardian consensusPipeGuardian;
private final IClientManager<TEndPoint, AsyncPipeConsensusServiceClient>
asyncClientManager;
private final IClientManager<TEndPoint, SyncPipeConsensusServiceClient>
syncClientManager;
@@ -114,10 +114,8 @@ public class PipeConsensus implements IConsensus {
this.config = config.getPipeConsensusConfig();
this.registry = registry;
this.rpcService = new PipeConsensusRPCService(thisNode,
config.getPipeConsensusConfig());
- this.consensusPipeManager =
- new ConsensusPipeManager(
- config.getPipeConsensusConfig().getPipe(),
- config.getPipeConsensusConfig().getReplicateMode());
+ this.consensusPipeSelector =
+ config.getPipeConsensusConfig().getPipe().getConsensusPipeSelector();
this.consensusPipeGuardian =
config.getPipeConsensusConfig().getPipe().getConsensusPipeGuardian();
this.asyncClientManager =
@@ -177,7 +175,6 @@ public class PipeConsensus implements IConsensus {
registry.apply(consensusGroupId),
new ArrayList<>(),
config,
- consensusPipeManager,
syncClientManager);
stateMachineMap.put(consensusGroupId, consensus);
checkPeerListAndStartIfEligible(consensusGroupId,
consensus);
@@ -220,14 +217,14 @@ public class PipeConsensus implements IConsensus {
// make peers which are in list correct
resetPeerListWithoutThrow.accept(
consensusGroupId,
correctPeerListBeforeStart.get(consensusGroupId));
- consensus.start(true);
+ consensus.start();
} else {
// clear peers which are not in the list
resetPeerListWithoutThrow.accept(consensusGroupId,
Collections.emptyList());
}
} else {
- consensus.start(true);
+ consensus.start();
}
}
@@ -243,7 +240,7 @@ public class PipeConsensus implements IConsensus {
private void checkAllConsensusPipe() {
final Map<ConsensusGroupId, Map<ConsensusPipeName, PipeStatus>>
existedPipes =
- consensusPipeManager.getAllConsensusPipe().entrySet().stream()
+ consensusPipeSelector.getAllConsensusPipe().entrySet().stream()
.filter(entry -> entry.getKey().getSenderDataNodeId() ==
thisNodeId)
.collect(
Collectors.groupingBy(
@@ -254,25 +251,16 @@ public class PipeConsensus implements IConsensus {
stateMachineMap.forEach(
(key, value) ->
value.checkConsensusPipe(existedPipes.getOrDefault(key,
ImmutableMap.of())));
+ // Log orphaned pipes (region no longer exists locally); ConfigNode
handles actual cleanup.
existedPipes.entrySet().stream()
.filter(entry -> !stateMachineMap.containsKey(entry.getKey()))
.flatMap(entry -> entry.getValue().keySet().stream())
.forEach(
- consensusPipeName -> {
- try {
+ consensusPipeName ->
LOGGER.warn(
- "{} drop consensus pipe [{}]",
+ "{} orphaned consensus pipe [{}] found, should be
dropped by ConfigNode",
consensusPipeName.getConsensusGroupId(),
- consensusPipeName);
- consensusPipeManager.updateConsensusPipe(consensusPipeName,
PipeStatus.DROPPED);
- } catch (Exception e) {
- LOGGER.warn(
- "{} cannot drop consensus pipe [{}]",
- consensusPipeName.getConsensusGroupId(),
- consensusPipeName,
- e);
- }
- });
+ consensusPipeName));
} finally {
stateMachineMapLock.writeLock().unlock();
}
@@ -347,10 +335,9 @@ public class PipeConsensus implements IConsensus {
registry.apply(groupId),
peers,
config,
- consensusPipeManager,
syncClientManager);
stateMachineMap.put(groupId, consensus);
- consensus.start(false); // pipe will start after creating
+ consensus.start();
KillPoint.setKillPoint(DataNodeKillPoints.DESTINATION_CREATE_LOCAL_PEER);
} catch (IOException e) {
LOGGER.warn("Cannot create local peer for group {} with peers {}",
groupId, peers, e);
@@ -511,7 +498,7 @@ public class PipeConsensus implements IConsensus {
for (Peer peer : correctPeers) {
if (!impl.containsPeer(peer) && peer.getNodeId() != this.thisNodeId) {
try {
- impl.createConsensusPipeToTargetPeer(peer, false);
+ impl.createConsensusPipeToTargetPeer(peer);
LOGGER.info("[RESET PEER LIST] {} Build sync channel with: {}",
groupId, peer);
} catch (ConsensusGroupModifyPeerException e) {
LOGGER.warn(
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
index 23030b89bd4..7aed02e075f 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
@@ -40,7 +40,6 @@ import
org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.config.PipeConsensusConfig;
import org.apache.iotdb.consensus.config.PipeConsensusConfig.ReplicateMode;
import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException;
-import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeManager;
import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeName;
import org.apache.iotdb.consensus.pipe.consensuspipe.ReplicateProgressManager;
import org.apache.iotdb.consensus.pipe.metric.PipeConsensusServerMetrics;
@@ -63,11 +62,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -79,8 +76,6 @@ public class PipeConsensusServerImpl {
private static final long
CHECK_TRANSMISSION_COMPLETION_INTERVAL_IN_MILLISECONDS = 2_000L;
private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS
=
PerformanceOverviewMetrics.getInstance();
- private static final long RETRY_WAIT_TIME_IN_MS = 500;
- private static final long MAX_RETRY_TIMES = 20;
private final Peer thisNode;
private final IStateMachine stateMachine;
private final Lock stateMachineLock = new ReentrantLock();
@@ -88,7 +83,6 @@ public class PipeConsensusServerImpl {
private final AtomicBoolean active;
private final AtomicBoolean isStarted;
private final String consensusGroupId;
- private final ConsensusPipeManager consensusPipeManager;
private final ReplicateProgressManager replicateProgressManager;
private final IClientManager<TEndPoint, SyncPipeConsensusServiceClient>
syncClientManager;
private final PipeConsensusServerMetrics pipeConsensusServerMetrics;
@@ -101,7 +95,6 @@ public class PipeConsensusServerImpl {
IStateMachine stateMachine,
List<Peer> peers,
PipeConsensusConfig config,
- ConsensusPipeManager consensusPipeManager,
IClientManager<TEndPoint, SyncPipeConsensusServiceClient>
syncClientManager)
throws IOException {
this.thisNode = thisNode;
@@ -110,85 +103,31 @@ public class PipeConsensusServerImpl {
this.active = new AtomicBoolean(true);
this.isStarted = new AtomicBoolean(false);
this.consensusGroupId = thisNode.getGroupId().toString();
- this.consensusPipeManager = consensusPipeManager;
this.replicateProgressManager = config.getPipe().getProgressIndexManager();
this.syncClientManager = syncClientManager;
this.pipeConsensusServerMetrics = new PipeConsensusServerMetrics(this);
this.replicateMode = config.getReplicateMode();
- // if peers is empty, the `resetPeerList` will automatically fetch correct
peers' info from CN.
- if (!peers.isEmpty()) {
- // create consensus pipes
- Set<Peer> deepCopyPeersWithoutSelf =
- peers.stream().filter(peer ->
!peer.equals(thisNode)).collect(Collectors.toSet());
- final List<Peer> successfulPipes =
createConsensusPipes(deepCopyPeersWithoutSelf);
- if (successfulPipes.size() < deepCopyPeersWithoutSelf.size()) {
- // roll back
- updateConsensusPipesStatus(successfulPipes, PipeStatus.DROPPED);
- throw new IOException(String.format("%s cannot create all consensus
pipes", thisNode));
- }
- }
+ // Consensus pipe creation is fully delegated to ConfigNode to avoid
deadlocks between
+ // DataNode RPC handlers and ConfigNode's PipeTaskCoordinatorLock.
ConfigNode proactively
+ // creates consensus pipes at key lifecycle points:
+ // 1. New DataRegion creation: via CreatePipeProcedureV2 in
CreateRegionGroupsProcedure
+ // 2. Region migration addPeer: via CREATE_CONSENSUS_PIPES state in
AddRegionPeerProcedure
}
- @SuppressWarnings("java:S2276")
- public synchronized void start(boolean startConsensusPipes) throws
IOException {
+ public synchronized void start() throws IOException {
stateMachine.start();
MetricService.getInstance().addMetricSet(this.pipeConsensusServerMetrics);
-
- if (startConsensusPipes) {
- // start all consensus pipes
- final List<Peer> otherPeers = peerManager.getOtherPeers(thisNode);
- List<Peer> failedPipes =
- updateConsensusPipesStatus(new ArrayList<>(otherPeers),
PipeStatus.RUNNING);
- // considering procedure can easily time out, keep trying
updateConsensusPipesStatus until all
- // consensus pipes are started gracefully or exceed the maximum number
of attempts.
- // NOTE: start pipe procedure is idempotent guaranteed.
- try {
- for (int i = 0; i < MAX_RETRY_TIMES && !failedPipes.isEmpty(); i++) {
- failedPipes = updateConsensusPipesStatus(failedPipes,
PipeStatus.RUNNING);
- Thread.sleep(RETRY_WAIT_TIME_IN_MS);
- }
- } catch (InterruptedException e) {
- LOGGER.warn(
- "PipeConsensusImpl-peer{}: pipeConsensusImpl thread get
interrupted when start consensus pipe. May because IoTDB process is killed.",
- thisNode);
- throw new IOException(String.format("%s cannot start all consensus
pipes", thisNode));
- }
- // if there still are some consensus pipes failed to start, throw an
exception.
- if (!failedPipes.isEmpty()) {
- // roll back
- List<Peer> successfulPipes = new ArrayList<>(otherPeers);
- successfulPipes.removeAll(failedPipes);
- updateConsensusPipesStatus(successfulPipes, PipeStatus.STOPPED);
- throw new IOException(String.format("%s cannot start all consensus
pipes", thisNode));
- }
- }
isStarted.set(true);
}
public synchronized void stop() {
- // stop all consensus pipes
- final List<Peer> otherPeers = peerManager.getOtherPeers(thisNode);
- final List<Peer> failedPipes =
- updateConsensusPipesStatus(new ArrayList<>(otherPeers),
PipeStatus.STOPPED);
- if (!failedPipes.isEmpty()) {
- // do not roll back, because it will stop anyway
- LOGGER.warn("{} cannot stop all consensus pipes", thisNode);
- }
MetricService.getInstance().removeMetricSet(this.pipeConsensusServerMetrics);
stateMachine.stop();
isStarted.set(false);
}
public synchronized void clear() {
- final List<Peer> otherPeers = peerManager.getOtherPeers(thisNode);
- final List<Peer> failedPipes =
- updateConsensusPipesStatus(new ArrayList<>(otherPeers),
PipeStatus.DROPPED);
- if (!failedPipes.isEmpty()) {
- // do not roll back, because it will clear anyway
- LOGGER.warn("{} cannot drop all consensus pipes", thisNode);
- }
-
MetricService.getInstance().removeMetricSet(this.pipeConsensusServerMetrics);
peerManager.clear();
stateMachine.stop();
@@ -196,55 +135,10 @@ public class PipeConsensusServerImpl {
active.set(false);
}
- private List<Peer> createConsensusPipes(Set<Peer> peers) {
- return peers.stream()
- .filter(
- peer -> {
- try {
- if (!peers.equals(thisNode)) {
- consensusPipeManager.createConsensusPipe(thisNode, peer);
- }
- return true;
- } catch (Exception e) {
- LOGGER.warn(
- "{}: cannot create consensus pipe between {} and {}",
- e.getMessage(),
- thisNode,
- peer,
- e);
- return false;
- }
- })
- .collect(Collectors.toList());
- }
-
/**
- * update given consensus pipes' status, returns the peer corresponding to
the pipe that failed to
- * update
+ * Detect inconsistencies between expected and existed consensus pipes.
Actual remediation
+ * (create/drop/update) is handled by ConfigNode; this method only logs
warnings.
*/
- private List<Peer> updateConsensusPipesStatus(List<Peer> peers, PipeStatus
status) {
- return peers.stream()
- .filter(
- peer -> {
- try {
- if (!peer.equals(thisNode)) {
- consensusPipeManager.updateConsensusPipe(
- new ConsensusPipeName(thisNode, peer), status);
- }
- return false;
- } catch (Exception e) {
- LOGGER.warn(
- "{}: cannot update consensus pipe between {} and {} to
status {}",
- e.getMessage(),
- thisNode,
- peer,
- status);
- return true;
- }
- })
- .collect(Collectors.toList());
- }
-
public synchronized void checkConsensusPipe(Map<ConsensusPipeName,
PipeStatus> existedPipes) {
final PipeStatus expectedStatus = isStarted.get() ? PipeStatus.RUNNING :
PipeStatus.STOPPED;
final Map<ConsensusPipeName, Peer> expectedPipes =
@@ -256,54 +150,27 @@ public class PipeConsensusServerImpl {
existedPipes.forEach(
(existedName, existedStatus) -> {
if (!expectedPipes.containsKey(existedName)) {
- try {
- LOGGER.warn("{} drop consensus pipe [{}]", consensusGroupId,
existedName);
- consensusPipeManager.updateConsensusPipe(existedName,
PipeStatus.DROPPED);
- } catch (Exception e) {
- LOGGER.warn("{} cannot drop consensus pipe [{}]",
consensusGroupId, existedName, e);
- }
+ LOGGER.warn(
+ "{} unexpected consensus pipe [{}] exists, should be dropped
by ConfigNode",
+ consensusGroupId,
+ existedName);
} else if (!expectedStatus.equals(existedStatus)) {
- try {
- LOGGER.warn(
- "{} update consensus pipe [{}] to status {}",
- consensusGroupId,
- existedName,
- expectedStatus);
- if (expectedStatus.equals(PipeStatus.RUNNING)) {
- // Do nothing. Because Pipe framework's metaSync will do that.
- return;
- }
- consensusPipeManager.updateConsensusPipe(existedName,
expectedStatus);
- } catch (Exception e) {
- LOGGER.warn(
- "{} cannot update consensus pipe [{}] to status {}",
- consensusGroupId,
- existedName,
- expectedStatus,
- e);
- }
+ LOGGER.warn(
+ "{} consensus pipe [{}] status mismatch: expected={},
actual={}",
+ consensusGroupId,
+ existedName,
+ expectedStatus,
+ existedStatus);
}
});
expectedPipes.forEach(
(expectedName, expectedPeer) -> {
if (!existedPipes.containsKey(expectedName)) {
- try {
- LOGGER.warn(
- "{} create and update consensus pipe [{}] to status {}",
- consensusGroupId,
- expectedName,
- expectedStatus);
- consensusPipeManager.createConsensusPipe(thisNode, expectedPeer);
- consensusPipeManager.updateConsensusPipe(expectedName,
expectedStatus);
- } catch (Exception e) {
- LOGGER.warn(
- "{} cannot create and update consensus pipe [{}] to status
{}",
- consensusGroupId,
- expectedName,
- expectedStatus,
- e);
- }
+ LOGGER.warn(
+ "{} consensus pipe [{}] missing, should be created by
ConfigNode",
+ consensusGroupId,
+ expectedName);
}
});
}
@@ -427,7 +294,7 @@ public class PipeConsensusServerImpl {
try {
// This node which acts as coordinator will transfer complete historical
snapshot to new
// target.
- createConsensusPipeToTargetPeer(targetPeer, false);
+ createConsensusPipeToTargetPeer(targetPeer);
} catch (Exception e) {
LOGGER.warn(
"{} cannot create consensus pipe to {}, may because target peer is
unknown currently, please manually check!",
@@ -438,17 +305,11 @@ public class PipeConsensusServerImpl {
}
}
- public synchronized void createConsensusPipeToTargetPeer(
- Peer targetPeer, boolean needManuallyStart) throws
ConsensusGroupModifyPeerException {
- try {
- KillPoint.setKillPoint(DataNodeKillPoints.ORIGINAL_ADD_PEER_DONE);
- consensusPipeManager.createConsensusPipe(thisNode, targetPeer,
needManuallyStart);
- peerManager.addPeer(targetPeer);
- } catch (Exception e) {
- LOGGER.warn("{} cannot create consensus pipe to {}", thisNode,
targetPeer, e);
- throw new ConsensusGroupModifyPeerException(
- String.format("%s cannot create consensus pipe to %s", thisNode,
targetPeer), e);
- }
+ public synchronized void createConsensusPipeToTargetPeer(Peer targetPeer)
+ throws ConsensusGroupModifyPeerException {
+ KillPoint.setKillPoint(DataNodeKillPoints.ORIGINAL_ADD_PEER_DONE);
+ // Pipe creation is delegated to ConfigNode; only update local peer list.
+ peerManager.addPeer(targetPeer);
}
public void notifyPeersToDropConsensusPipe(Peer targetPeer)
@@ -493,32 +354,8 @@ public class PipeConsensusServerImpl {
public synchronized void dropConsensusPipeToTargetPeer(Peer targetPeer)
throws ConsensusGroupModifyPeerException {
- try {
- consensusPipeManager.dropConsensusPipe(thisNode, targetPeer);
- peerManager.removePeer(targetPeer);
- } catch (Exception e) {
- LOGGER.warn("{} cannot drop consensus pipe to {}", thisNode, targetPeer,
e);
- throw new ConsensusGroupModifyPeerException(
- String.format("%s cannot drop consensus pipe to %s", thisNode,
targetPeer), e);
- }
- }
-
- public void startOtherConsensusPipesToTargetPeer(Peer targetPeer)
- throws ConsensusGroupModifyPeerException {
- final List<Peer> otherPeers = peerManager.getOtherPeers(thisNode);
- for (Peer peer : otherPeers) {
- if (peer.equals(targetPeer)) {
- continue;
- }
- try {
- consensusPipeManager.updateConsensusPipe(
- new ConsensusPipeName(peer, targetPeer), PipeStatus.RUNNING);
- } catch (Exception e) {
- // just warn but not throw exceptions. Because there may exist unknown
nodes in consensus
- // group
- LOGGER.warn("{} cannot start consensus pipe to {}", peer, targetPeer,
e);
- }
- }
+ // Pipe drop is delegated to ConfigNode; only update local peer list.
+ peerManager.removePeer(targetPeer);
}
/** Wait for the user written data up to firstCheck to be replicated */
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeDispatcher.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeDispatcher.java
deleted file mode 100644
index 568f68bb577..00000000000
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeDispatcher.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.consensus.pipe.consensuspipe;
-
-import java.util.Map;
-
-public interface ConsensusPipeDispatcher {
- void createPipe(
- String pipeName,
- Map<String, String> extractorAttributes,
- Map<String, String> processorAttributes,
- Map<String, String> connectorAttributes,
- boolean needManuallyStart)
- throws Exception;
-
- void startPipe(String pipeName) throws Exception;
-
- void stopPipe(String pipeName) throws Exception;
-
- /**
- * Use ConsensusPipeName instead of String to provide information for
receiverAgent to release
- * corresponding resource
- */
- void dropPipe(ConsensusPipeName pipeName) throws Exception;
-}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeManager.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeManager.java
deleted file mode 100644
index c1aef74a4b4..00000000000
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeManager.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.consensus.pipe.consensuspipe;
-
-import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
-import org.apache.iotdb.consensus.common.Peer;
-import org.apache.iotdb.consensus.config.PipeConsensusConfig;
-import org.apache.iotdb.consensus.config.PipeConsensusConfig.ReplicateMode;
-
-import com.google.common.collect.ImmutableMap;
-import org.apache.tsfile.external.commons.lang3.tuple.ImmutableTriple;
-import org.apache.tsfile.external.commons.lang3.tuple.Triple;
-
-import java.util.Map;
-
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_CONSENSUS_GROUP_ID_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_CONSENSUS_PIPE_NAME;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_IP_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_PORT_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_CAPTURE_TABLE_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_CAPTURE_TREE_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_CONSENSUS_GROUP_ID_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_CONSENSUS_RECEIVER_DATANODE_ID_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_CONSENSUS_SENDER_DATANODE_ID_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_INCLUSION_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_IOTDB_USER_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_REALTIME_MODE_KEY;
-
-public class ConsensusPipeManager {
- // Extract data.insert and data.delete to support deletion.
- private static final String CONSENSUS_EXTRACTOR_INCLUSION_VALUE = "data";
- private final PipeConsensusConfig.Pipe config;
- private final ReplicateMode replicateMode;
- private final ConsensusPipeDispatcher dispatcher;
- private final ConsensusPipeSelector selector;
-
- public ConsensusPipeManager(PipeConsensusConfig.Pipe config, ReplicateMode
replicateMode) {
- this.config = config;
- this.replicateMode = replicateMode;
- this.dispatcher = config.getConsensusPipeDispatcher();
- this.selector = config.getConsensusPipeSelector();
- }
-
- /** This method is used except region migration. */
- public void createConsensusPipe(Peer senderPeer, Peer receiverPeer) throws
Exception {
- ConsensusPipeName consensusPipeName = new ConsensusPipeName(senderPeer,
receiverPeer);
- // The third parameter is only used when region migration. Since this
method is not called by
- // region migration, just pass senderPeer in to get the correct result.
- Triple<ImmutableMap<String, String>, ImmutableMap<String, String>,
ImmutableMap<String, String>>
- params = buildPipeParams(senderPeer, receiverPeer);
- dispatcher.createPipe(
- consensusPipeName.toString(),
- params.getLeft(),
- params.getMiddle(),
- params.getRight(),
- false);
- }
-
- /** This method is used when executing region migration */
- public void createConsensusPipe(Peer senderPeer, Peer receiverPeer, boolean
needManuallyStart)
- throws Exception {
- ConsensusPipeName consensusPipeName = new ConsensusPipeName(senderPeer,
receiverPeer);
- Triple<ImmutableMap<String, String>, ImmutableMap<String, String>,
ImmutableMap<String, String>>
- params = buildPipeParams(senderPeer, receiverPeer);
- dispatcher.createPipe(
- consensusPipeName.toString(),
- params.getLeft(),
- params.getMiddle(),
- params.getRight(),
- needManuallyStart);
- }
-
- public Triple<
- ImmutableMap<String, String>, ImmutableMap<String, String>,
ImmutableMap<String, String>>
- buildPipeParams(final Peer senderPeer, final Peer receiverPeer) {
- final ConsensusPipeName consensusPipeName = new
ConsensusPipeName(senderPeer, receiverPeer);
- return new ImmutableTriple<>(
- ImmutableMap.<String, String>builder()
- .put(EXTRACTOR_KEY, config.getExtractorPluginName())
- .put(EXTRACTOR_INCLUSION_KEY, CONSENSUS_EXTRACTOR_INCLUSION_VALUE)
- .put(
- EXTRACTOR_CONSENSUS_GROUP_ID_KEY,
- consensusPipeName.getConsensusGroupId().toString())
- .put(
- EXTRACTOR_CONSENSUS_SENDER_DATANODE_ID_KEY,
- String.valueOf(consensusPipeName.getSenderDataNodeId()))
- .put(
- EXTRACTOR_CONSENSUS_RECEIVER_DATANODE_ID_KEY,
- String.valueOf(consensusPipeName.getReceiverDataNodeId()))
- .put(EXTRACTOR_REALTIME_MODE_KEY, replicateMode.getValue())
- .put(EXTRACTOR_CAPTURE_TABLE_KEY, String.valueOf(true))
- .put(EXTRACTOR_CAPTURE_TREE_KEY, String.valueOf(true))
- .put(
- EXTRACTOR_IOTDB_USER_KEY,
-
CommonDescriptor.getInstance().getConfig().getDefaultAdminName())
- .build(),
- ImmutableMap.<String, String>builder()
- .put(PROCESSOR_KEY, config.getProcessorPluginName())
- .build(),
- ImmutableMap.<String, String>builder()
- .put(CONNECTOR_KEY, config.getConnectorPluginName())
- .put(
- CONNECTOR_CONSENSUS_GROUP_ID_KEY,
-
String.valueOf(consensusPipeName.getConsensusGroupId().getId()))
- .put(CONNECTOR_CONSENSUS_PIPE_NAME, consensusPipeName.toString())
- .put(CONNECTOR_IOTDB_IP_KEY, receiverPeer.getEndpoint().ip)
- .put(CONNECTOR_IOTDB_PORT_KEY,
String.valueOf(receiverPeer.getEndpoint().port))
- .put(CONNECTOR_IOTDB_PARALLEL_TASKS_KEY, String.valueOf(1))
- .put(CONNECTOR_REALTIME_FIRST_KEY, String.valueOf(false))
- .build());
- }
-
- public void dropConsensusPipe(Peer senderPeer, Peer receiverPeer) throws
Exception {
- ConsensusPipeName consensusPipeName = new ConsensusPipeName(senderPeer,
receiverPeer);
- dispatcher.dropPipe(consensusPipeName);
- }
-
- public void updateConsensusPipe(ConsensusPipeName consensusPipeName,
PipeStatus pipeStatus)
- throws Exception {
- if (PipeStatus.RUNNING.equals(pipeStatus)) {
- dispatcher.startPipe(consensusPipeName.toString());
- } else if (PipeStatus.STOPPED.equals(pipeStatus)) {
- dispatcher.stopPipe(consensusPipeName.toString());
- } else if (PipeStatus.DROPPED.equals(pipeStatus)) {
- dispatcher.dropPipe(consensusPipeName);
- } else {
- throw new IllegalArgumentException("Unsupported pipe status: " +
pipeStatus);
- }
- }
-
- public Map<ConsensusPipeName, PipeStatus> getAllConsensusPipe() {
- return selector.getAllConsensusPipe();
- }
-}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCServiceProcessor.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCServiceProcessor.java
index 3aa69af6ff5..b99abc27dfc 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCServiceProcessor.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCServiceProcessor.java
@@ -125,8 +125,7 @@ public class PipeConsensusRPCServiceProcessor implements
PipeConsensusIService.I
new Peer(
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.targetPeerConsensusGroupId),
req.targetPeerNodeId,
- req.targetPeerEndPoint),
- false);
+ req.targetPeerEndPoint));
responseStatus = new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (ConsensusGroupModifyPeerException e) {
responseStatus = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index 700fd79e5eb..a33cdd11024 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -44,7 +44,6 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import
org.apache.iotdb.db.consensus.statemachine.dataregion.DataRegionStateMachine;
import
org.apache.iotdb.db.consensus.statemachine.dataregion.IoTConsensusDataRegionStateMachine;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
-import org.apache.iotdb.db.pipe.consensus.ConsensusPipeDataNodeDispatcher;
import
org.apache.iotdb.db.pipe.consensus.ConsensusPipeDataNodeRuntimeAgentGuardian;
import org.apache.iotdb.db.pipe.consensus.ReplicateProgressDataNodeManager;
import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResourceManager;
@@ -188,7 +187,6 @@ public class DataRegionConsensusImpl {
.setConnectorPluginName(
BuiltinPipePlugin.PIPE_CONSENSUS_ASYNC_CONNECTOR.getPipePluginName())
// name
- .setConsensusPipeDispatcher(new
ConsensusPipeDataNodeDispatcher())
.setConsensusPipeGuardian(new
ConsensusPipeDataNodeRuntimeAgentGuardian())
.setConsensusPipeSelector(
() ->
PipeDataNodeAgent.task().getAllConsensusPipe())
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ConsensusPipeDataNodeDispatcher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ConsensusPipeDataNodeDispatcher.java
deleted file mode 100644
index 02179a29f56..00000000000
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ConsensusPipeDataNodeDispatcher.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.consensus;
-
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.client.IClientManager;
-import org.apache.iotdb.commons.consensus.ConfigRegionId;
-import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
-import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeDispatcher;
-import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeName;
-import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
-import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
-import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
-import org.apache.iotdb.pipe.api.exception.PipeException;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeRPCMessageConstant.PIPE_ALREADY_EXIST_MSG;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeRPCMessageConstant.PIPE_NOT_EXIST_MSG;
-
-public class ConsensusPipeDataNodeDispatcher implements
ConsensusPipeDispatcher {
- private static final Logger LOGGER =
- LoggerFactory.getLogger(ConsensusPipeDataNodeDispatcher.class);
-
- private static final IClientManager<ConfigRegionId, ConfigNodeClient>
CONFIG_NODE_CLIENT_MANAGER =
- ConfigNodeClientManager.getInstance();
-
- @Override
- public void createPipe(
- String pipeName,
- Map<String, String> extractorAttributes,
- Map<String, String> processorAttributes,
- Map<String, String> connectorAttributes,
- boolean needManuallyStart)
- throws Exception {
- try (ConfigNodeClient configNodeClient =
-
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
- TCreatePipeReq req =
- new TCreatePipeReq()
- .setPipeName(pipeName)
- .setNeedManuallyStart(needManuallyStart)
- .setExtractorAttributes(extractorAttributes)
- .setProcessorAttributes(processorAttributes)
- .setConnectorAttributes(connectorAttributes);
- TSStatus status = configNodeClient.createPipe(req);
- if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != status.getCode()) {
- LOGGER.warn("Failed to create consensus pipe-{}, status: {}",
pipeName, status);
- // ignore idempotence logic
- if (status.getMessage().contains(PIPE_ALREADY_EXIST_MSG)) {
- return;
- }
- throw new PipeException(status.getMessage());
- }
- } catch (Exception e) {
- LOGGER.warn("Failed to create consensus pipe-{}", pipeName, e);
- throw new PipeException("Failed to create consensus pipe", e);
- }
- }
-
- @Override
- public void startPipe(String pipeName) throws Exception {
- try (ConfigNodeClient configNodeClient =
-
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
- TSStatus status = configNodeClient.startPipe(pipeName);
- if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != status.getCode()) {
- LOGGER.warn("Failed to start consensus pipe-{}, status: {}", pipeName,
status);
- throw new PipeException(status.getMessage());
- }
- } catch (Exception e) {
- LOGGER.warn("Failed to start consensus pipe-{}", pipeName, e);
- throw new PipeException("Failed to start consensus pipe", e);
- }
- }
-
- @Override
- public void stopPipe(String pipeName) throws Exception {
- try (ConfigNodeClient configNodeClient =
-
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
- final TSStatus status = configNodeClient.stopPipe(pipeName);
- if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != status.getCode()) {
- LOGGER.warn("Failed to stop consensus pipe-{}, status: {}", pipeName,
status);
- throw new PipeException(status.getMessage());
- }
- } catch (Exception e) {
- LOGGER.warn("Failed to stop consensus pipe-{}", pipeName, e);
- throw new PipeException("Failed to stop consensus pipe", e);
- }
- }
-
- // Use ConsensusPipeName instead of String to provide information for
receiverAgent to release
- // corresponding resource
- @Override
- public void dropPipe(ConsensusPipeName pipeName) throws Exception {
- try (ConfigNodeClient configNodeClient =
-
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
- final TSStatus status = configNodeClient.dropPipe(pipeName.toString());
- if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != status.getCode()) {
- LOGGER.warn("Failed to drop consensus pipe-{}, status: {}", pipeName,
status);
- // ignore idempotence logic
- if (status.getMessage().contains(PIPE_NOT_EXIST_MSG)) {
- return;
- }
- throw new PipeException(status.getMessage());
- }
- } catch (Exception e) {
- LOGGER.warn("Failed to drop consensus pipe-{}", pipeName, e);
- throw new PipeException("Failed to drop consensus pipe", e);
- }
- }
-}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
index eaa4fa0a4e9..2d7cbe18358 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -303,26 +303,6 @@ public class DataNode extends ServerCommandLine implements
DataNodeMBean {
thisNode);
DNAuditLogger.getInstance().log(fields, () -> logMessage);
}
-
- if (isUsingPipeConsensus()) {
- long dataRegionStartTime = System.currentTimeMillis();
- while (!StorageEngine.getInstance().isReadyForNonReadWriteFunctions())
{
- try {
- TimeUnit.MILLISECONDS.sleep(1000);
- } catch (InterruptedException e) {
- logger.warn("IoTDB DataNode failed to set up.", e);
- Thread.currentThread().interrupt();
- return;
- }
- }
- DataRegionConsensusImpl.getInstance().start();
- long dataRegionEndTime = System.currentTimeMillis();
- logger.info(
- "DataRegion consensus start successfully, which takes {} ms.",
- (dataRegionEndTime - dataRegionStartTime));
- dataRegionConsensusStarted = true;
- }
-
} catch (Throwable e) {
int exitStatusCode = retrieveExitStatusCode(e);
logger.error("Fail to start server", e);
@@ -770,11 +750,11 @@ public class DataNode extends ServerCommandLine
implements DataNodeMBean {
*
* @throws StartupException if start up failed.
*/
- private void active() throws StartupException {
+ private void active() throws StartupException, IOException {
try {
processPid();
setUp();
- } catch (StartupException e) {
+ } catch (StartupException | IOException e) {
logger.error("Meet error while starting up.", e);
throw e;
}
@@ -808,7 +788,7 @@ public class DataNode extends ServerCommandLine implements
DataNodeMBean {
}
}
- private void setUp() throws StartupException {
+ private void setUp() throws StartupException, IOException {
logger.info("Setting up IoTDB DataNode...");
registerManager.register(new JMXService());
JMXService.registerMBean(getInstance(), mbeanName);
@@ -863,6 +843,33 @@ public class DataNode extends ServerCommandLine implements
DataNodeMBean {
registerManager.register(CompactionTaskManager.getInstance());
+ // Start PipeConsensus (DataRegionConsensus) before Internal RPC Service
and Pipe Agent
+ // recovery.
+ // This ensures consensus groups are registered so that deleteLocalPeer()
can succeed when
+ // DELETE_OLD_REGION_PEER requests arrive during the pipe recovery phase.
+ if (isUsingPipeConsensus()) {
+ long dataRegionStartTime = System.currentTimeMillis();
+ while (!StorageEngine.getInstance().isReadyForNonReadWriteFunctions()) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(1000);
+ } catch (InterruptedException e) {
+ logger.warn("IoTDB DataNode failed to set up.", e);
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
+ DataRegionConsensusImpl.getInstance().start();
+ long dataRegionEndTime = System.currentTimeMillis();
+ logger.info(
+ "DataRegion consensus start successfully, which takes {} ms.",
+ (dataRegionEndTime - dataRegionStartTime));
+ dataRegionConsensusStarted = true;
+ }
+
+ // Start Internal RPC Service before pipe agent recovery, so that the
DataNode can accept
+ // cluster scheduling requests (e.g. DELETE_OLD_REGION_PEER) while pipe
recovery is in progress.
+ registerInternalRPCService();
+
// Register subscription agent before pipe agent
registerManager.register(SubscriptionAgent.runtime());
registerManager.register(PipeDataNodeAgent.runtime());
@@ -873,12 +880,8 @@ public class DataNode extends ServerCommandLine implements
DataNodeMBean {
/** Set up RPC and protocols after DataNode is available */
private void setUpRPCService() throws StartupException {
-
- registerInternalRPCService();
-
- // Notice: During the period between starting the internal RPC service
- // and starting the client RPC service , some requests may fail because
- // DataNode is not marked as RUNNING by ConfigNode-leader yet.
+ // Internal RPC Service is already started in setUp() before pipe agent
recovery,
+ // so we only need to start client RPC and protocols here.
// Start client RPCService to indicate that the current DataNode provide
external services
IoTDBDescriptor.getInstance()