This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9bfe0b0a78e Pipe: Made the historical pipe split auto dropped after
completion (#17295)
9bfe0b0a78e is described below
commit 9bfe0b0a78eec2f55b0d3f5388669f488b0e4a4b
Author: Caideyipi <[email protected]>
AuthorDate: Tue Mar 31 11:21:42 2026 +0800
Pipe: Made the historical pipe split auto dropped after completion (#17295)
* snapshot
* may-comp
* auto
---
.../manual/enhanced/IoTDBPipeAutoDropIT.java | 123 ++++++++++++---------
.../config/executor/ClusterConfigTaskExecutor.java | 2 +
2 files changed, 70 insertions(+), 55 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoDropIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoDropIT.java
index 6ab974c2f1e..ff906d8a5af 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoDropIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoDropIT.java
@@ -24,10 +24,12 @@ import
org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2DualTableManualEnhanced;
+import org.apache.iotdb.itbase.env.BaseEnv;
import org.apache.iotdb.pipe.it.dual.tablemodel.TableModelUtils;
import
org.apache.iotdb.pipe.it.dual.tablemodel.manual.AbstractPipeTableModelDualManualIT;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -40,6 +42,7 @@ import org.junit.runner.RunWith;
import java.sql.Connection;
import java.sql.ResultSet;
+import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collections;
import java.util.HashMap;
@@ -49,6 +52,7 @@ import java.util.function.Consumer;
import static org.apache.iotdb.util.MagicUtils.makeItCloseQuietly;
import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.fail;
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2DualTableManualEnhanced.class})
@@ -60,6 +64,34 @@ public class IoTDBPipeAutoDropIT extends
AbstractPipeTableModelDualManualIT {
super.setUp();
}
+ protected void setupConfig() {
+ // Enable auto split
+ senderEnv
+ .getConfig()
+ .getCommonConfig()
+ .setAutoCreateSchemaEnabled(true)
+ .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+ .setEnforceStrongPassword(false)
+ .setPipeMemoryManagementEnabled(false)
+ .setIsPipeEnableMemoryCheck(false);
+ receiverEnv
+ .getConfig()
+ .getCommonConfig()
+ .setAutoCreateSchemaEnabled(true)
+ .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+ .setEnforceStrongPassword(false)
+ .setPipeMemoryManagementEnabled(false)
+ .setIsPipeEnableMemoryCheck(false);
+
+ // 10 min, assert that the operations will not time out
+ senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
+ receiverEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
+
+
senderEnv.getConfig().getConfigNodeConfig().setLeaderDistributionPolicy("HASH");
+ }
+
@Test
public void testAutoDropInHistoricalTransfer() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
@@ -70,48 +102,29 @@ public class IoTDBPipeAutoDropIT extends
AbstractPipeTableModelDualManualIT {
TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
};
- final String receiverIp = receiverDataNode.getIp();
- final int receiverPort = receiverDataNode.getPort();
-
- try (final SyncConfigNodeIServiceClient client =
- (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
-
- TableModelUtils.createDataBaseAndTable(senderEnv, "test", "test");
- TableModelUtils.insertData("test", "test", 0, 100, senderEnv);
-
- final Map<String, String> extractorAttributes = new HashMap<>();
- final Map<String, String> processorAttributes = new HashMap<>();
- final Map<String, String> connectorAttributes = new HashMap<>();
-
- extractorAttributes.put("mode.snapshot", "true");
- extractorAttributes.put("capture.table", "true");
- extractorAttributes.put("user", "root");
-
- connectorAttributes.put("connector", "iotdb-thrift-connector");
- connectorAttributes.put("connector.batch.enable", "false");
- connectorAttributes.put("connector.ip", receiverIp);
- connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
-
- final TSStatus status =
- client.createPipe(
- new TCreatePipeReq("p1", connectorAttributes)
- .setExtractorAttributes(extractorAttributes)
- .setProcessorAttributes(processorAttributes));
+ // Create an ordinary full sync pipe
+ final String sql =
+ String.format("create pipe a2b ('node-urls'='%s')",
receiverDataNode.getIpAndPortString());
+ try (final Connection connection =
senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
+ final Statement statement = connection.createStatement()) {
+ statement.execute(sql);
+ } catch (SQLException e) {
+ fail(e.getMessage());
+ }
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
+ TableModelUtils.createDataBaseAndTable(senderEnv, "test", "test");
+ TableModelUtils.insertData("test", "test", 0, 100, senderEnv);
- TestUtils.assertDataEventuallyOnEnv(
- receiverEnv,
- TableModelUtils.getQueryCountSql("test"),
- "_col0,",
- Collections.singleton("100,"),
- "test",
- handleFailure);
- }
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ TableModelUtils.getQueryCountSql("test"),
+ "_col0,",
+ Collections.singleton("100,"),
+ "test",
+ handleFailure);
- try (final Connection connection =
makeItCloseQuietly(senderEnv.getConnection());
+ try (final Connection connection =
+
makeItCloseQuietly(senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT));
final Statement statement =
makeItCloseQuietly(connection.createStatement()); ) {
ResultSet result = statement.executeQuery("show pipes");
await()
@@ -124,9 +137,9 @@ public class IoTDBPipeAutoDropIT extends
AbstractPipeTableModelDualManualIT {
try {
int pipeNum = 0;
while (result.next()) {
- if (!result
- .getString(ColumnHeaderConstant.ID)
- .contains(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
+ final String pipeName =
result.getString(ColumnHeaderConstant.ID);
+ if
(!pipeName.contains(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)
+ && pipeName.endsWith("_history")) {
pipeNum++;
}
}
@@ -157,25 +170,25 @@ public class IoTDBPipeAutoDropIT extends
AbstractPipeTableModelDualManualIT {
TableModelUtils.createDataBaseAndTable(senderEnv, "test", "test");
TableModelUtils.insertData("test", "test", 0, 100, senderEnv);
- final Map<String, String> extractorAttributes = new HashMap<>();
+ final Map<String, String> sourceAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
- final Map<String, String> connectorAttributes = new HashMap<>();
+ final Map<String, String> sinkAttributes = new HashMap<>();
- extractorAttributes.put("mode.snapshot", "true");
- extractorAttributes.put("capture.table", "true");
- extractorAttributes.put("start-time", "0");
- extractorAttributes.put("end-time", "49");
- extractorAttributes.put("user", "root");
+ sourceAttributes.put("mode.snapshot", "true");
+ sourceAttributes.put("capture.table", "true");
+ sourceAttributes.put("start-time", "0");
+ sourceAttributes.put("end-time", "49");
+ sourceAttributes.put("user", "root");
- connectorAttributes.put("connector", "iotdb-thrift-connector");
- connectorAttributes.put("connector.batch.enable", "false");
- connectorAttributes.put("connector.ip", receiverIp);
- connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
+ sinkAttributes.put("sink", "iotdb-thrift-sink");
+ sinkAttributes.put("sink.batch.enable", "false");
+ sinkAttributes.put("sink.ip", receiverIp);
+ sinkAttributes.put("sink.port", Integer.toString(receiverPort));
final TSStatus status =
client.createPipe(
- new TCreatePipeReq("p1", connectorAttributes)
- .setExtractorAttributes(extractorAttributes)
+ new TCreatePipeReq("p1", sinkAttributes)
+ .setExtractorAttributes(sourceAttributes)
.setProcessorAttributes(processorAttributes));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index fc28d7a5f32..de3261b0484 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -2253,6 +2253,8 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
Boolean.toString(false),
PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY,
Boolean.toString(true),
+ PipeSourceConstant.EXTRACTOR_MODE_KEY,
+
PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE,
// We force the historical pipe to
transfer data (and maybe
// deletion) only
// Thus we can transfer schema only once