This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9a67740018a [IoTV2] Add ConfigNode-side consensus pipe guardian and
remove deprecated DataNode-side checking (#17277)
9a67740018a is described below
commit 9a67740018af8eb2b2f69fce23c0634903e76921
Author: Peng Junzhi <[email protected]>
AuthorDate: Thu Mar 12 05:12:12 2026 -0500
[IoTV2] Add ConfigNode-side consensus pipe guardian and remove deprecated
DataNode-side checking (#17277)
* add consensus pipe check to config node
* add ut for consensus pipe checker
* remove datanode deprecated code
* fix review
---
.../pipe/coordinator/runtime/PipeMetaSyncer.java | 20 ++
.../pipe/coordinator/task/PipeTaskCoordinator.java | 6 +
.../confignode/persistence/pipe/PipeTaskInfo.java | 14 +
.../procedure/env/RegionMaintainHandler.java | 89 +++++++
.../pipe/PipeTaskInfoConsensusPipeTest.java | 152 +++++++++++
.../RegionMaintainHandlerConsensusPipeTest.java | 290 +++++++++++++++++++++
.../consensus/config/PipeConsensusConfig.java | 49 +---
.../apache/iotdb/consensus/pipe/PipeConsensus.java | 47 ----
.../consensus/pipe/PipeConsensusServerImpl.java | 43 ---
.../pipe/consensuspipe/ConsensusPipeGuardian.java | 26 --
.../pipe/consensuspipe/ConsensusPipeSelector.java | 28 --
.../db/consensus/DataRegionConsensusImpl.java | 6 -
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 21 --
.../ConsensusPipeDataNodeRuntimeAgentGuardian.java | 48 ----
14 files changed, 573 insertions(+), 266 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java
index 1df5ac021ca..959dc7f4363 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java
@@ -55,6 +55,9 @@ public class PipeMetaSyncer {
private Future<?> metaSyncFuture;
private final AtomicInteger pipeAutoRestartRoundCounter = new
AtomicInteger(0);
+ private final AtomicInteger consensusPipeCheckRoundCounter = new
AtomicInteger(0);
+
+ private static final int CONSENSUS_PIPE_CHECK_INTERVAL_ROUND = 5;
private final boolean pipeAutoRestartEnabled =
PipeConfig.getInstance().getPipeAutoRestartEnabled();
@@ -110,6 +113,11 @@ public class PipeMetaSyncer {
pipeAutoRestartRoundCounter.set(0);
}
+ if (consensusPipeCheckRoundCounter.incrementAndGet() >=
CONSENSUS_PIPE_CHECK_INTERVAL_ROUND) {
+ consensusPipeCheckRoundCounter.set(0);
+ checkAndRepairConsensusPipes();
+ }
+
final TSStatus metaSyncStatus = procedureManager.pipeMetaSync();
if (metaSyncStatus.getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -158,6 +166,18 @@ public class PipeMetaSyncer {
}
}
+ private void checkAndRepairConsensusPipes() {
+ try {
+ configManager
+ .getProcedureManager()
+ .getEnv()
+ .getRegionMaintainHandler()
+ .checkAndRepairConsensusPipes();
+ } catch (Exception e) {
+ LOGGER.warn("Failed to check and repair consensus pipes", e);
+ }
+ }
+
private boolean handleSuccessfulRestartWithLock() {
final AtomicReference<PipeTaskInfo> pipeTaskInfo =
configManager.getPipeManager().getPipeTaskCoordinator().tryLock();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
index ea9c61cf45e..e44e80075cd 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
@@ -21,6 +21,7 @@ package
org.apache.iotdb.confignode.manager.pipe.coordinator.task;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
import
org.apache.iotdb.confignode.consensus.request.read.pipe.task.ShowPipePlanV2;
import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp;
import org.apache.iotdb.confignode.manager.ConfigManager;
@@ -42,6 +43,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
public class PipeTaskCoordinator {
@@ -245,6 +247,10 @@ public class PipeTaskCoordinator {
return !pipeTaskInfo.isEmpty();
}
+ public Map<String, PipeStatus> getConsensusPipeStatusMap() {
+ return pipeTaskInfo.getConsensusPipeStatusMap();
+ }
+
/** Caller should ensure that the method is called in the write lock of
{@link #pipeTaskInfo}. */
public void updateLastSyncedVersion() {
pipeTaskInfo.updateLastSyncedVersion();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 929024a0689..78cf9a10eee 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -563,6 +563,20 @@ public class PipeTaskInfo implements SnapshotProcessor {
}
}
+ public Map<String, PipeStatus> getConsensusPipeStatusMap() {
+ acquireReadLock();
+ try {
+ return
StreamSupport.stream(pipeMetaKeeper.getPipeMetaList().spliterator(), false)
+ .filter(pipeMeta ->
PipeType.CONSENSUS.equals(pipeMeta.getStaticMeta().getPipeType()))
+ .collect(
+ Collectors.toMap(
+ pipeMeta -> pipeMeta.getStaticMeta().getPipeName(),
+ pipeMeta -> pipeMeta.getRuntimeMeta().getStatus().get()));
+ } finally {
+ releaseReadLock();
+ }
+ }
+
public boolean isEmpty() {
acquireReadLock();
try {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
index f827adea5d4..0d6f306694b 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
import
org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager;
@@ -509,6 +510,94 @@ public class RegionMaintainHandler {
}
}
+ /**
+ * Periodically called by PipeMetaSyncer to reconcile expected consensus
pipes (derived from
+ * PartitionManager replica sets) with actually existing consensus pipes
(from PipeTaskInfo).
+ * Creates missing pipes, drops unexpected pipes, and restarts stopped pipes.
+ */
+ public void checkAndRepairConsensusPipes() {
+ if (!IOT_CONSENSUS_V2.equals(CONF.getDataRegionConsensusProtocolClass())) {
+ return;
+ }
+
+ Map<TConsensusGroupId, TRegionReplicaSet> allDataRegionReplicaSets =
+
configManager.getPartitionManager().getAllReplicaSetsMap(TConsensusGroupType.DataRegion);
+
+ // Build expected pipe name -> TRegionReplicaSet for creation purposes
+ Map<String, TRegionReplicaSet> expectedPipeToReplicaSet = new HashMap<>();
+ for (TRegionReplicaSet replicaSet : allDataRegionReplicaSets.values()) {
+ List<TDataNodeLocation> locations = replicaSet.getDataNodeLocations();
+ DataRegionId regionId = new
DataRegionId(replicaSet.getRegionId().getId());
+ for (int i = 0; i < locations.size(); i++) {
+ for (int j = 0; j < locations.size(); j++) {
+ if (i != j) {
+ String pipeName =
+ new ConsensusPipeName(
+ regionId,
+ locations.get(i).getDataNodeId(),
+ locations.get(j).getDataNodeId())
+ .toString();
+ expectedPipeToReplicaSet.put(pipeName, replicaSet);
+ }
+ }
+ }
+ }
+
+ Map<String, PipeStatus> actualPipes =
+
configManager.getPipeManager().getPipeTaskCoordinator().getConsensusPipeStatusMap();
+
+ // Create missing pipes
+ for (Map.Entry<String, TRegionReplicaSet> entry :
expectedPipeToReplicaSet.entrySet()) {
+ String pipeName = entry.getKey();
+ if (!actualPipes.containsKey(pipeName)) {
+ LOGGER.warn(
+ "[ConsensusPipeGuardian] consensus pipe [{}] missing, creating
asynchronously",
+ pipeName);
+ TRegionReplicaSet replicaSet = entry.getValue();
+ ConsensusPipeName parsed = new ConsensusPipeName(pipeName);
+ TDataNodeLocation senderLocation =
+ findLocationByNodeId(replicaSet.getDataNodeLocations(),
parsed.getSenderDataNodeId());
+ TDataNodeLocation receiverLocation =
+ findLocationByNodeId(replicaSet.getDataNodeLocations(),
parsed.getReceiverDataNodeId());
+ if (senderLocation != null && receiverLocation != null) {
+ createSingleConsensusPipeAsync(
+ replicaSet.getRegionId(),
+ senderLocation.getDataNodeId(),
+ senderLocation.getDataRegionConsensusEndPoint(),
+ receiverLocation.getDataNodeId(),
+ receiverLocation.getDataRegionConsensusEndPoint());
+ }
+ }
+ }
+
+ // Drop unexpected pipes and restart stopped pipes
+ for (Map.Entry<String, PipeStatus> entry : actualPipes.entrySet()) {
+ String pipeName = entry.getKey();
+ PipeStatus status = entry.getValue();
+ if (!expectedPipeToReplicaSet.containsKey(pipeName)) {
+ LOGGER.warn(
+ "[ConsensusPipeGuardian] unexpected consensus pipe [{}] exists,
dropping asynchronously",
+ pipeName);
+ configManager.getProcedureManager().dropConsensusPipeAsync(pipeName);
+ } else if (PipeStatus.STOPPED.equals(status)) {
+ LOGGER.warn(
+ "[ConsensusPipeGuardian] consensus pipe [{}] is stopped,
restarting asynchronously",
+ pipeName);
+ configManager.getProcedureManager().startConsensusPipe(pipeName);
+ }
+ }
+ }
+
+ private static TDataNodeLocation findLocationByNodeId(
+ List<TDataNodeLocation> locations, int nodeId) {
+ for (TDataNodeLocation location : locations) {
+ if (location.getDataNodeId() == nodeId) {
+ return location;
+ }
+ }
+ return null;
+ }
+
private boolean isIoTConsensusV2DataRegion(TConsensusGroupId regionId) {
return TConsensusGroupType.DataRegion.equals(regionId.getType())
&& IOT_CONSENSUS_V2.equals(CONF.getDataRegionConsensusProtocolClass());
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoConsensusPipeTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoConsensusPipeTest.java
new file mode 100644
index 00000000000..094dcebe82f
--- /dev/null
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoConsensusPipeTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.persistence.pipe;
+
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2;
+import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
+import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeName;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class PipeTaskInfoConsensusPipeTest {
+
+ private PipeTaskInfo pipeTaskInfo;
+
+ @Before
+ public void setUp() {
+ pipeTaskInfo = new PipeTaskInfo();
+ }
+
+ private void createPipe(String pipeName, PipeStatus initialStatus) {
+ Map<String, String> extractorAttributes = new HashMap<>();
+ Map<String, String> processorAttributes = new HashMap<>();
+ Map<String, String> connectorAttributes = new HashMap<>();
+ extractorAttributes.put("extractor", "iotdb-source");
+ processorAttributes.put("processor", "do-nothing-processor");
+ connectorAttributes.put("connector", "iotdb-thrift-sink");
+
+ PipeTaskMeta pipeTaskMeta = new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
+ ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new ConcurrentHashMap<>();
+ pipeTasks.put(1, pipeTaskMeta);
+ PipeStaticMeta pipeStaticMeta =
+ new PipeStaticMeta(
+ pipeName,
+ System.currentTimeMillis(),
+ extractorAttributes,
+ processorAttributes,
+ connectorAttributes);
+ PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta(pipeTasks);
+ CreatePipePlanV2 createPlan = new CreatePipePlanV2(pipeStaticMeta,
pipeRuntimeMeta);
+ pipeTaskInfo.createPipe(createPlan);
+
+ if (PipeStatus.RUNNING.equals(initialStatus)) {
+ pipeTaskInfo.setPipeStatus(new SetPipeStatusPlanV2(pipeName,
PipeStatus.RUNNING));
+ }
+ }
+
+ @Test
+ public void testGetConsensusPipeStatusMapEmpty() {
+ Map<String, PipeStatus> result = pipeTaskInfo.getConsensusPipeStatusMap();
+ Assert.assertNotNull(result);
+ Assert.assertTrue(result.isEmpty());
+ }
+
+ @Test
+ public void testGetConsensusPipeStatusMapFiltersOnlyConsensusPipes() {
+ String userPipeName = "myUserPipe";
+ createPipe(userPipeName, PipeStatus.RUNNING);
+
+ DataRegionId regionId = new DataRegionId(100);
+ String consensusPipeName1 = new ConsensusPipeName(regionId, 1,
2).toString();
+ String consensusPipeName2 = new ConsensusPipeName(regionId, 2,
1).toString();
+ createPipe(consensusPipeName1, PipeStatus.RUNNING);
+ createPipe(consensusPipeName2, PipeStatus.STOPPED);
+
+ Map<String, PipeStatus> result = pipeTaskInfo.getConsensusPipeStatusMap();
+
+ Assert.assertEquals(2, result.size());
+ Assert.assertFalse(result.containsKey(userPipeName));
+ Assert.assertEquals(PipeStatus.RUNNING, result.get(consensusPipeName1));
+ Assert.assertEquals(PipeStatus.STOPPED, result.get(consensusPipeName2));
+ }
+
+ @Test
+ public void testGetConsensusPipeStatusMapWithOnlyUserPipes() {
+ createPipe("userPipe1", PipeStatus.RUNNING);
+ createPipe("userPipe2", PipeStatus.STOPPED);
+
+ Map<String, PipeStatus> result = pipeTaskInfo.getConsensusPipeStatusMap();
+
+ Assert.assertNotNull(result);
+ Assert.assertTrue(result.isEmpty());
+ }
+
+ @Test
+ public void testGetConsensusPipeStatusMapWithMultipleRegions() {
+ DataRegionId region1 = new DataRegionId(100);
+ DataRegionId region2 = new DataRegionId(200);
+
+ String pipe1 = new ConsensusPipeName(region1, 1, 2).toString();
+ String pipe2 = new ConsensusPipeName(region1, 2, 1).toString();
+ String pipe3 = new ConsensusPipeName(region2, 3, 4).toString();
+ String pipe4 = new ConsensusPipeName(region2, 4, 3).toString();
+ createPipe(pipe1, PipeStatus.RUNNING);
+ createPipe(pipe2, PipeStatus.RUNNING);
+ createPipe(pipe3, PipeStatus.STOPPED);
+ createPipe(pipe4, PipeStatus.RUNNING);
+
+ Map<String, PipeStatus> result = pipeTaskInfo.getConsensusPipeStatusMap();
+
+ Assert.assertEquals(4, result.size());
+ Assert.assertEquals(PipeStatus.RUNNING, result.get(pipe1));
+ Assert.assertEquals(PipeStatus.RUNNING, result.get(pipe2));
+ Assert.assertEquals(PipeStatus.STOPPED, result.get(pipe3));
+ Assert.assertEquals(PipeStatus.RUNNING, result.get(pipe4));
+ }
+
+ @Test
+ public void testGetConsensusPipeStatusMapExcludesSubscriptionPipes() {
+ String subscriptionPipeName = PipeStaticMeta.SUBSCRIPTION_PIPE_PREFIX +
"topic1.group1";
+ createPipe(subscriptionPipeName, PipeStatus.RUNNING);
+
+ DataRegionId regionId = new DataRegionId(100);
+ String consensusPipeName = new ConsensusPipeName(regionId, 1,
2).toString();
+ createPipe(consensusPipeName, PipeStatus.RUNNING);
+
+ Map<String, PipeStatus> result = pipeTaskInfo.getConsensusPipeStatusMap();
+
+ Assert.assertEquals(1, result.size());
+ Assert.assertTrue(result.containsKey(consensusPipeName));
+ Assert.assertFalse(result.containsKey(subscriptionPipeName));
+ }
+}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandlerConsensusPipeTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandlerConsensusPipeTest.java
new file mode 100644
index 00000000000..b384b9e0f0a
--- /dev/null
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandlerConsensusPipeTest.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.env;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.ProcedureManager;
+import org.apache.iotdb.confignode.manager.partition.PartitionManager;
+import org.apache.iotdb.confignode.manager.pipe.coordinator.PipeManager;
+import
org.apache.iotdb.confignode.manager.pipe.coordinator.task.PipeTaskCoordinator;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeName;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS_V2;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class RegionMaintainHandlerConsensusPipeTest {
+
+ private ConfigManager configManager;
+ private PartitionManager partitionManager;
+ private PipeManager pipeManager;
+ private PipeTaskCoordinator pipeTaskCoordinator;
+ private ProcedureManager procedureManager;
+ private RegionMaintainHandler handler;
+
+ private String originalConsensusProtocol;
+
+ @Before
+ public void setUp() {
+ originalConsensusProtocol =
+
ConfigNodeDescriptor.getInstance().getConf().getDataRegionConsensusProtocolClass();
+ ConfigNodeDescriptor.getInstance()
+ .getConf()
+ .setDataRegionConsensusProtocolClass(IOT_CONSENSUS_V2);
+
+ configManager = mock(ConfigManager.class);
+ partitionManager = mock(PartitionManager.class);
+ pipeManager = mock(PipeManager.class);
+ pipeTaskCoordinator = mock(PipeTaskCoordinator.class);
+ procedureManager = mock(ProcedureManager.class);
+
+ when(configManager.getPartitionManager()).thenReturn(partitionManager);
+ when(configManager.getPipeManager()).thenReturn(pipeManager);
+ when(configManager.getProcedureManager()).thenReturn(procedureManager);
+ when(pipeManager.getPipeTaskCoordinator()).thenReturn(pipeTaskCoordinator);
+
+ handler = new RegionMaintainHandler(configManager);
+ }
+
+ @After
+ public void tearDown() {
+ ConfigNodeDescriptor.getInstance()
+ .getConf()
+ .setDataRegionConsensusProtocolClass(originalConsensusProtocol);
+ }
+
+ private TDataNodeLocation makeLocation(int nodeId, String ip, int
consensusPort) {
+ TDataNodeLocation location = new TDataNodeLocation();
+ location.setDataNodeId(nodeId);
+ location.setClientRpcEndPoint(new TEndPoint(ip, 6667));
+ location.setInternalEndPoint(new TEndPoint(ip, 10730));
+ location.setMPPDataExchangeEndPoint(new TEndPoint(ip, 10740));
+ location.setDataRegionConsensusEndPoint(new TEndPoint(ip, consensusPort));
+ location.setSchemaRegionConsensusEndPoint(new TEndPoint(ip, 10760));
+ return location;
+ }
+
+ private TRegionReplicaSet makeReplicaSet(int regionId, TDataNodeLocation...
locations) {
+ TRegionReplicaSet replicaSet = new TRegionReplicaSet();
+ replicaSet.setRegionId(new
TConsensusGroupId(TConsensusGroupType.DataRegion, regionId));
+ replicaSet.setDataNodeLocations(Arrays.asList(locations));
+ return replicaSet;
+ }
+
+ @Test
+ public void testNoOpWhenNotIoTConsensusV2() {
+ ConfigNodeDescriptor.getInstance()
+ .getConf()
+
.setDataRegionConsensusProtocolClass("org.apache.iotdb.consensus.ratis.RatisConsensus");
+
+ handler.checkAndRepairConsensusPipes();
+
+ verify(partitionManager, never()).getAllReplicaSetsMap(any());
+ }
+
+ @Test
+ public void testNothingToDoWhenAllPipesMatch() {
+ TDataNodeLocation loc1 = makeLocation(1, "127.0.0.1", 40010);
+ TDataNodeLocation loc2 = makeLocation(2, "127.0.0.2", 40010);
+ TRegionReplicaSet replicaSet = makeReplicaSet(100, loc1, loc2);
+
+ TConsensusGroupId groupId = new
TConsensusGroupId(TConsensusGroupType.DataRegion, 100);
+ Map<TConsensusGroupId, TRegionReplicaSet> replicaSets = new HashMap<>();
+ replicaSets.put(groupId, replicaSet);
+ when(partitionManager.getAllReplicaSetsMap(TConsensusGroupType.DataRegion))
+ .thenReturn(replicaSets);
+
+ DataRegionId regionId = new DataRegionId(100);
+ String pipe1to2 = new ConsensusPipeName(regionId, 1, 2).toString();
+ String pipe2to1 = new ConsensusPipeName(regionId, 2, 1).toString();
+ Map<String, PipeStatus> actualPipes = new HashMap<>();
+ actualPipes.put(pipe1to2, PipeStatus.RUNNING);
+ actualPipes.put(pipe2to1, PipeStatus.RUNNING);
+
when(pipeTaskCoordinator.getConsensusPipeStatusMap()).thenReturn(actualPipes);
+
+ handler.checkAndRepairConsensusPipes();
+
+ verify(procedureManager, never()).createConsensusPipeAsync(any());
+ verify(procedureManager, never()).dropConsensusPipeAsync(any());
+ verify(procedureManager, never()).startConsensusPipe(any());
+ }
+
+ @Test
+ public void testCreatesMissingPipes() {
+ TDataNodeLocation loc1 = makeLocation(1, "127.0.0.1", 40010);
+ TDataNodeLocation loc2 = makeLocation(2, "127.0.0.2", 40010);
+ TRegionReplicaSet replicaSet = makeReplicaSet(100, loc1, loc2);
+
+ TConsensusGroupId groupId = new
TConsensusGroupId(TConsensusGroupType.DataRegion, 100);
+ Map<TConsensusGroupId, TRegionReplicaSet> replicaSets = new HashMap<>();
+ replicaSets.put(groupId, replicaSet);
+ when(partitionManager.getAllReplicaSetsMap(TConsensusGroupType.DataRegion))
+ .thenReturn(replicaSets);
+
+
when(pipeTaskCoordinator.getConsensusPipeStatusMap()).thenReturn(Collections.emptyMap());
+
+ handler.checkAndRepairConsensusPipes();
+
+ verify(procedureManager,
times(2)).createConsensusPipeAsync(any(TCreatePipeReq.class));
+ verify(procedureManager, never()).dropConsensusPipeAsync(any());
+ verify(procedureManager, never()).startConsensusPipe(any());
+ }
+
+ @Test
+ public void testDropsUnexpectedPipes() {
+ when(partitionManager.getAllReplicaSetsMap(TConsensusGroupType.DataRegion))
+ .thenReturn(Collections.emptyMap());
+
+ DataRegionId regionId = new DataRegionId(999);
+ String unexpectedPipe = new ConsensusPipeName(regionId, 1, 2).toString();
+ Map<String, PipeStatus> actualPipes = new HashMap<>();
+ actualPipes.put(unexpectedPipe, PipeStatus.RUNNING);
+
when(pipeTaskCoordinator.getConsensusPipeStatusMap()).thenReturn(actualPipes);
+
+ handler.checkAndRepairConsensusPipes();
+
+ verify(procedureManager, never()).createConsensusPipeAsync(any());
+ verify(procedureManager, times(1)).dropConsensusPipeAsync(unexpectedPipe);
+ verify(procedureManager, never()).startConsensusPipe(any());
+ }
+
+ @Test
+ public void testRestartsStoppedPipes() {
+ TDataNodeLocation loc1 = makeLocation(1, "127.0.0.1", 40010);
+ TDataNodeLocation loc2 = makeLocation(2, "127.0.0.2", 40010);
+ TRegionReplicaSet replicaSet = makeReplicaSet(100, loc1, loc2);
+
+ TConsensusGroupId groupId = new
TConsensusGroupId(TConsensusGroupType.DataRegion, 100);
+ Map<TConsensusGroupId, TRegionReplicaSet> replicaSets = new HashMap<>();
+ replicaSets.put(groupId, replicaSet);
+ when(partitionManager.getAllReplicaSetsMap(TConsensusGroupType.DataRegion))
+ .thenReturn(replicaSets);
+
+ DataRegionId regionId = new DataRegionId(100);
+ String pipe1to2 = new ConsensusPipeName(regionId, 1, 2).toString();
+ String pipe2to1 = new ConsensusPipeName(regionId, 2, 1).toString();
+ Map<String, PipeStatus> actualPipes = new HashMap<>();
+ actualPipes.put(pipe1to2, PipeStatus.RUNNING);
+ actualPipes.put(pipe2to1, PipeStatus.STOPPED);
+
when(pipeTaskCoordinator.getConsensusPipeStatusMap()).thenReturn(actualPipes);
+
+ handler.checkAndRepairConsensusPipes();
+
+ verify(procedureManager, never()).createConsensusPipeAsync(any());
+ verify(procedureManager, never()).dropConsensusPipeAsync(any());
+ verify(procedureManager, times(1)).startConsensusPipe(pipe2to1);
+ }
+
+ @Test
+ public void testMixedScenario() {
+ TDataNodeLocation loc1 = makeLocation(1, "127.0.0.1", 40010);
+ TDataNodeLocation loc2 = makeLocation(2, "127.0.0.2", 40010);
+ TDataNodeLocation loc3 = makeLocation(3, "127.0.0.3", 40010);
+ TRegionReplicaSet replicaSet = makeReplicaSet(100, loc1, loc2, loc3);
+
+ TConsensusGroupId groupId = new
TConsensusGroupId(TConsensusGroupType.DataRegion, 100);
+ Map<TConsensusGroupId, TRegionReplicaSet> replicaSets = new HashMap<>();
+ replicaSets.put(groupId, replicaSet);
+ when(partitionManager.getAllReplicaSetsMap(TConsensusGroupType.DataRegion))
+ .thenReturn(replicaSets);
+
+ DataRegionId regionId100 = new DataRegionId(100);
+ DataRegionId regionId999 = new DataRegionId(999);
+
+ // 3 nodes => 6 expected pipes: 1->2, 1->3, 2->1, 2->3, 3->1, 3->2
+ // Provide only 3 of the 6, one stopped; plus one unexpected pipe
+ String pipe1to2 = new ConsensusPipeName(regionId100, 1, 2).toString();
+ String pipe2to1 = new ConsensusPipeName(regionId100, 2, 1).toString();
+ String pipe3to1 = new ConsensusPipeName(regionId100, 3, 1).toString();
+ String unexpected = new ConsensusPipeName(regionId999, 5, 6).toString();
+
+ Map<String, PipeStatus> actualPipes = new HashMap<>();
+ actualPipes.put(pipe1to2, PipeStatus.RUNNING);
+ actualPipes.put(pipe2to1, PipeStatus.STOPPED);
+ actualPipes.put(pipe3to1, PipeStatus.RUNNING);
+ actualPipes.put(unexpected, PipeStatus.RUNNING);
+
when(pipeTaskCoordinator.getConsensusPipeStatusMap()).thenReturn(actualPipes);
+
+ handler.checkAndRepairConsensusPipes();
+
+ // Missing: 1->3, 2->3, 3->2 => 3 creates
+ verify(procedureManager,
times(3)).createConsensusPipeAsync(any(TCreatePipeReq.class));
+ // Unexpected pipe => 1 drop
+ verify(procedureManager, times(1)).dropConsensusPipeAsync(unexpected);
+ // Stopped pipe 2->1 => 1 restart
+ verify(procedureManager, times(1)).startConsensusPipe(pipe2to1);
+ }
+
+ @Test
+ public void testThreeNodeReplicaSetCreatesAllSixPipes() {
+ TDataNodeLocation loc1 = makeLocation(1, "127.0.0.1", 40010);
+ TDataNodeLocation loc2 = makeLocation(2, "127.0.0.2", 40010);
+ TDataNodeLocation loc3 = makeLocation(3, "127.0.0.3", 40010);
+ TRegionReplicaSet replicaSet = makeReplicaSet(100, loc1, loc2, loc3);
+
+ TConsensusGroupId groupId = new
TConsensusGroupId(TConsensusGroupType.DataRegion, 100);
+ Map<TConsensusGroupId, TRegionReplicaSet> replicaSets = new HashMap<>();
+ replicaSets.put(groupId, replicaSet);
+ when(partitionManager.getAllReplicaSetsMap(TConsensusGroupType.DataRegion))
+ .thenReturn(replicaSets);
+
+
when(pipeTaskCoordinator.getConsensusPipeStatusMap()).thenReturn(Collections.emptyMap());
+
+ handler.checkAndRepairConsensusPipes();
+
+ // 3 nodes => 3*2 = 6 pipes to create
+ verify(procedureManager,
times(6)).createConsensusPipeAsync(any(TCreatePipeReq.class));
+ }
+
+ @Test
+ public void testEmptyReplicaSetsAndEmptyPipes() {
+ when(partitionManager.getAllReplicaSetsMap(TConsensusGroupType.DataRegion))
+ .thenReturn(Collections.emptyMap());
+
when(pipeTaskCoordinator.getConsensusPipeStatusMap()).thenReturn(Collections.emptyMap());
+
+ handler.checkAndRepairConsensusPipes();
+
+ verify(procedureManager, never()).createConsensusPipeAsync(any());
+ verify(procedureManager, never()).dropConsensusPipeAsync(any());
+ verify(procedureManager, never()).startConsensusPipe(any());
+ }
+}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java
index c0d7257183d..28625f1e9cd 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java
@@ -20,9 +20,7 @@
package org.apache.iotdb.consensus.config;
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
-import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeGuardian;
import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeReceiver;
-import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeSelector;
import org.apache.iotdb.consensus.pipe.consensuspipe.ReplicateProgressManager;
import java.util.concurrent.TimeUnit;
@@ -243,29 +241,20 @@ public class PipeConsensusConfig {
private final String extractorPluginName;
private final String processorPluginName;
private final String connectorPluginName;
- private final ConsensusPipeGuardian consensusPipeGuardian;
- private final ConsensusPipeSelector consensusPipeSelector;
private final ReplicateProgressManager replicateProgressManager;
private final ConsensusPipeReceiver consensusPipeReceiver;
- private final long consensusPipeGuardJobIntervalInSeconds;
public Pipe(
String extractorPluginName,
String processorPluginName,
String connectorPluginName,
- ConsensusPipeGuardian consensusPipeGuardian,
- ConsensusPipeSelector consensusPipeSelector,
ReplicateProgressManager replicateProgressManager,
- ConsensusPipeReceiver consensusPipeReceiver,
- long consensusPipeGuardJobIntervalInSeconds) {
+ ConsensusPipeReceiver consensusPipeReceiver) {
this.extractorPluginName = extractorPluginName;
this.processorPluginName = processorPluginName;
this.connectorPluginName = connectorPluginName;
- this.consensusPipeGuardian = consensusPipeGuardian;
- this.consensusPipeSelector = consensusPipeSelector;
this.replicateProgressManager = replicateProgressManager;
this.consensusPipeReceiver = consensusPipeReceiver;
- this.consensusPipeGuardJobIntervalInSeconds =
consensusPipeGuardJobIntervalInSeconds;
}
public String getExtractorPluginName() {
@@ -280,14 +269,6 @@ public class PipeConsensusConfig {
return connectorPluginName;
}
- public ConsensusPipeGuardian getConsensusPipeGuardian() {
- return consensusPipeGuardian;
- }
-
- public ConsensusPipeSelector getConsensusPipeSelector() {
- return consensusPipeSelector;
- }
-
public ConsensusPipeReceiver getConsensusPipeReceiver() {
return consensusPipeReceiver;
}
@@ -296,10 +277,6 @@ public class PipeConsensusConfig {
return replicateProgressManager;
}
- public long getConsensusPipeGuardJobIntervalInSeconds() {
- return consensusPipeGuardJobIntervalInSeconds;
- }
-
public static Pipe.Builder newBuilder() {
return new Pipe.Builder();
}
@@ -310,11 +287,8 @@ public class PipeConsensusConfig {
BuiltinPipePlugin.PIPE_CONSENSUS_PROCESSOR.getPipePluginName();
private String connectorPluginName =
BuiltinPipePlugin.PIPE_CONSENSUS_ASYNC_CONNECTOR.getPipePluginName();
- private ConsensusPipeGuardian consensusPipeGuardian = null;
- private ConsensusPipeSelector consensusPipeSelector = null;
private ReplicateProgressManager replicateProgressManager = null;
private ConsensusPipeReceiver consensusPipeReceiver = null;
- private long consensusPipeGuardJobIntervalInSeconds = 180L;
public Pipe.Builder setExtractorPluginName(String extractorPluginName) {
this.extractorPluginName = extractorPluginName;
@@ -331,16 +305,6 @@ public class PipeConsensusConfig {
return this;
}
- public Pipe.Builder setConsensusPipeGuardian(ConsensusPipeGuardian
consensusPipeGuardian) {
- this.consensusPipeGuardian = consensusPipeGuardian;
- return this;
- }
-
- public Pipe.Builder setConsensusPipeSelector(ConsensusPipeSelector
consensusPipeSelector) {
- this.consensusPipeSelector = consensusPipeSelector;
- return this;
- }
-
public Pipe.Builder setConsensusPipeReceiver(ConsensusPipeReceiver
consensusPipeReceiver) {
this.consensusPipeReceiver = consensusPipeReceiver;
return this;
@@ -352,22 +316,13 @@ public class PipeConsensusConfig {
return this;
}
- public Pipe.Builder setConsensusPipeGuardJobIntervalInSeconds(
- long consensusPipeGuardJobIntervalInSeconds) {
- this.consensusPipeGuardJobIntervalInSeconds =
consensusPipeGuardJobIntervalInSeconds;
- return this;
- }
-
public Pipe build() {
return new Pipe(
extractorPluginName,
processorPluginName,
connectorPluginName,
- consensusPipeGuardian,
- consensusPipeSelector,
replicateProgressManager,
- consensusPipeReceiver,
- consensusPipeGuardJobIntervalInSeconds);
+ consensusPipeReceiver);
}
}
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
index 33d73d673bf..2981a1a9204 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
@@ -27,7 +27,6 @@ import
org.apache.iotdb.commons.client.sync.SyncPipeConsensusServiceClient;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import
org.apache.iotdb.commons.consensus.iotv2.container.IoTV2GlobalComponentContainer;
import org.apache.iotdb.commons.exception.StartupException;
-import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
import org.apache.iotdb.commons.service.RegisterManager;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.commons.utils.KillPoint.DataNodeKillPoints;
@@ -50,16 +49,12 @@ import
org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
import org.apache.iotdb.consensus.exception.IllegalPeerNumException;
import
org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
-import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeGuardian;
-import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeName;
-import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeSelector;
import org.apache.iotdb.consensus.pipe.service.PipeConsensusRPCService;
import
org.apache.iotdb.consensus.pipe.service.PipeConsensusRPCServiceProcessor;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -82,10 +77,8 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiConsumer;
-import java.util.stream.Collectors;
public class PipeConsensus implements IConsensus {
- private static final String CONSENSUS_PIPE_GUARDIAN_TASK_ID =
"consensus_pipe_guardian";
private static final String CLASS_NAME = PipeConsensus.class.getSimpleName();
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeConsensus.class);
@@ -101,8 +94,6 @@ public class PipeConsensus implements IConsensus {
new ConcurrentHashMap<>();
private final ReentrantReadWriteLock stateMachineMapLock = new
ReentrantReadWriteLock();
private final PipeConsensusConfig config;
- private final ConsensusPipeSelector consensusPipeSelector;
- private final ConsensusPipeGuardian consensusPipeGuardian;
private final IClientManager<TEndPoint, AsyncPipeConsensusServiceClient>
asyncClientManager;
private final IClientManager<TEndPoint, SyncPipeConsensusServiceClient>
syncClientManager;
private Map<ConsensusGroupId, List<Peer>> correctPeerListBeforeStart = null;
@@ -114,10 +105,6 @@ public class PipeConsensus implements IConsensus {
this.config = config.getPipeConsensusConfig();
this.registry = registry;
this.rpcService = new PipeConsensusRPCService(thisNode,
config.getPipeConsensusConfig());
- this.consensusPipeSelector =
- config.getPipeConsensusConfig().getPipe().getConsensusPipeSelector();
- this.consensusPipeGuardian =
- config.getPipeConsensusConfig().getPipe().getConsensusPipeGuardian();
this.asyncClientManager =
IoTV2GlobalComponentContainer.getInstance().getGlobalAsyncClientManager();
this.syncClientManager =
@@ -145,11 +132,6 @@ public class PipeConsensus implements IConsensus {
Thread.currentThread().interrupt();
LOGGER.warn("IoTV2 Recover Task is interrupted", ie);
}
- // only when we recover all consensus group can we launch async backend
checker thread
- consensusPipeGuardian.start(
- CONSENSUS_PIPE_GUARDIAN_TASK_ID,
- this::checkAllConsensusPipe,
- config.getPipe().getConsensusPipeGuardJobIntervalInSeconds());
}
private Future<Void> initAndRecover() throws IOException {
@@ -233,39 +215,10 @@ public class PipeConsensus implements IConsensus {
asyncClientManager.close();
syncClientManager.close();
registerManager.deregisterAll();
- consensusPipeGuardian.stop();
stateMachineMap.values().parallelStream().forEach(PipeConsensusServerImpl::stop);
IoTV2GlobalComponentContainer.getInstance().stopBackgroundTaskService();
}
- private void checkAllConsensusPipe() {
- final Map<ConsensusGroupId, Map<ConsensusPipeName, PipeStatus>>
existedPipes =
- consensusPipeSelector.getAllConsensusPipe().entrySet().stream()
- .filter(entry -> entry.getKey().getSenderDataNodeId() ==
thisNodeId)
- .collect(
- Collectors.groupingBy(
- entry -> entry.getKey().getConsensusGroupId(),
- Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
- stateMachineMapLock.writeLock().lock();
- try {
- stateMachineMap.forEach(
- (key, value) ->
- value.checkConsensusPipe(existedPipes.getOrDefault(key,
ImmutableMap.of())));
- // Log orphaned pipes (region no longer exists locally); ConfigNode
handles actual cleanup.
- existedPipes.entrySet().stream()
- .filter(entry -> !stateMachineMap.containsKey(entry.getKey()))
- .flatMap(entry -> entry.getValue().keySet().stream())
- .forEach(
- consensusPipeName ->
- LOGGER.warn(
- "{} orphaned consensus pipe [{}] found, should be
dropped by ConfigNode",
- consensusPipeName.getConsensusGroupId(),
- consensusPipeName));
- } finally {
- stateMachineMapLock.writeLock().unlock();
- }
- }
-
@Override
public TSStatus write(ConsensusGroupId groupId, IConsensusRequest request)
throws ConsensusException {
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
index 7aed02e075f..2ef293a960a 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.index.ComparableConsensusRequest;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
-import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
import org.apache.iotdb.commons.utils.KillPoint.DataNodeKillPoints;
@@ -56,7 +55,6 @@ import
org.apache.iotdb.consensus.pipe.thrift.TWaitReleaseAllRegionRelatedResour
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.RpcUtils;
-import com.google.common.collect.ImmutableMap;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,7 +62,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -135,46 +132,6 @@ public class PipeConsensusServerImpl {
active.set(false);
}
- /**
- * Detect inconsistencies between expected and existed consensus pipes.
Actual remediation
- * (create/drop/update) is handled by ConfigNode; this method only logs
warnings.
- */
- public synchronized void checkConsensusPipe(Map<ConsensusPipeName,
PipeStatus> existedPipes) {
- final PipeStatus expectedStatus = isStarted.get() ? PipeStatus.RUNNING :
PipeStatus.STOPPED;
- final Map<ConsensusPipeName, Peer> expectedPipes =
- peerManager.getOtherPeers(thisNode).stream()
- .collect(
- ImmutableMap.toImmutableMap(
- peer -> new ConsensusPipeName(thisNode, peer), peer ->
peer));
-
- existedPipes.forEach(
- (existedName, existedStatus) -> {
- if (!expectedPipes.containsKey(existedName)) {
- LOGGER.warn(
- "{} unexpected consensus pipe [{}] exists, should be dropped
by ConfigNode",
- consensusGroupId,
- existedName);
- } else if (!expectedStatus.equals(existedStatus)) {
- LOGGER.warn(
- "{} consensus pipe [{}] status mismatch: expected={},
actual={}",
- consensusGroupId,
- existedName,
- expectedStatus,
- existedStatus);
- }
- });
-
- expectedPipes.forEach(
- (expectedName, expectedPeer) -> {
- if (!existedPipes.containsKey(expectedName)) {
- LOGGER.warn(
- "{} consensus pipe [{}] missing, should be created by
ConfigNode",
- consensusGroupId,
- expectedName);
- }
- });
- }
-
public TSStatus write(IConsensusRequest request) {
stateMachineLock.lock();
try {
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeGuardian.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeGuardian.java
deleted file mode 100644
index 6c1e9cc1eb1..00000000000
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeGuardian.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.consensus.pipe.consensuspipe;
-
-public interface ConsensusPipeGuardian {
- void start(String id, Runnable guardJob, long intervalInSeconds);
-
- void stop();
-}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeSelector.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeSelector.java
deleted file mode 100644
index 6af75b5718e..00000000000
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeSelector.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.consensus.pipe.consensuspipe;
-
-import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
-
-import java.util.Map;
-
-public interface ConsensusPipeSelector {
- Map<ConsensusPipeName, PipeStatus> getAllConsensusPipe();
-}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index a33cdd11024..0a908ff8967 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -44,7 +44,6 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import
org.apache.iotdb.db.consensus.statemachine.dataregion.DataRegionStateMachine;
import
org.apache.iotdb.db.consensus.statemachine.dataregion.IoTConsensusDataRegionStateMachine;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
-import
org.apache.iotdb.db.pipe.consensus.ConsensusPipeDataNodeRuntimeAgentGuardian;
import org.apache.iotdb.db.pipe.consensus.ReplicateProgressDataNodeManager;
import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResourceManager;
import org.apache.iotdb.db.storageengine.StorageEngine;
@@ -186,13 +185,8 @@ public class DataRegionConsensusImpl {
BuiltinPipePlugin.PIPE_CONSENSUS_PROCESSOR.getPipePluginName())
.setConnectorPluginName(
BuiltinPipePlugin.PIPE_CONSENSUS_ASYNC_CONNECTOR.getPipePluginName())
- // name
- .setConsensusPipeGuardian(new
ConsensusPipeDataNodeRuntimeAgentGuardian())
- .setConsensusPipeSelector(
- () ->
PipeDataNodeAgent.task().getAllConsensusPipe())
.setConsensusPipeReceiver(PipeDataNodeAgent.receiver().pipeConsensus())
.setProgressIndexManager(new
ReplicateProgressDataNodeManager())
- .setConsensusPipeGuardJobIntervalInSeconds(300)
.build())
.setReplicateMode(ReplicateMode.fromValue(CONF.getIotConsensusV2Mode()))
.build())
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 2f1be77fe3e..0f8b9446d60 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -37,7 +37,6 @@ import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
-import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeType;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
@@ -46,7 +45,6 @@ import
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.consensus.exception.ConsensusException;
-import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeName;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
@@ -75,7 +73,6 @@ import
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
-import com.google.common.collect.ImmutableMap;
import org.apache.thrift.TException;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
@@ -102,7 +99,6 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_END_TIME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE;
@@ -673,23 +669,6 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
}
}
- public Map<ConsensusPipeName, PipeStatus> getAllConsensusPipe() {
- if (!tryReadLockWithTimeOut(10)) {
- throw new PipeException("Failed to get all consensus pipe.");
- }
-
- try {
- return
StreamSupport.stream(pipeMetaKeeper.getPipeMetaList().spliterator(), false)
- .filter(pipeMeta ->
PipeType.CONSENSUS.equals(pipeMeta.getStaticMeta().getPipeType()))
- .collect(
- ImmutableMap.toImmutableMap(
- pipeMeta -> new
ConsensusPipeName(pipeMeta.getStaticMeta().getPipeName()),
- pipeMeta -> pipeMeta.getRuntimeMeta().getStatus().get()));
- } finally {
- releaseReadLock();
- }
- }
-
@Override
protected void calculateMemoryUsage(
final PipeStaticMeta staticMeta,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ConsensusPipeDataNodeRuntimeAgentGuardian.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ConsensusPipeDataNodeRuntimeAgentGuardian.java
deleted file mode 100644
index 2f3cf532fc3..00000000000
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ConsensusPipeDataNodeRuntimeAgentGuardian.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.consensus;
-
-import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeGuardian;
-import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ConsensusPipeDataNodeRuntimeAgentGuardian implements
ConsensusPipeGuardian {
- private static final Logger LOGGER =
- LoggerFactory.getLogger(ConsensusPipeDataNodeRuntimeAgentGuardian.class);
- private boolean registered = false;
-
- @Override
- public synchronized void start(String id, Runnable guardJob, long
intervalInSeconds) {
- if (!registered) {
- LOGGER.info(
- "Registering periodical job {} with interval in seconds {}.", id,
intervalInSeconds);
-
- this.registered = true;
- PipeDataNodeAgent.runtime().registerPeriodicalJob(id, guardJob,
intervalInSeconds);
- }
- }
-
- @Override
- public synchronized void stop() {
- // Do nothing because PipePeriodicalJobExecutor currently has no
deregister logic
- }
-}