This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 6348c2db2a0 [to dev/1.3] Pipe: backport auto split fixes (#17866)
6348c2db2a0 is described below
commit 6348c2db2a04973d203659380b849efabbffb0f2
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 9 15:14:42 2026 +0800
[to dev/1.3] Pipe: backport auto split fixes (#17866)
* Pipe: Fixed the bug that separated historical pipe may not include mod on
deletion & The pipe without data.insert may be wrongly separated by pipe and
transfer data (#17346)
* fix
* fix
* f
(cherry picked from commit 355a872cfaeb77e1e7fe68d78a90544b3054d3a5)
* Pipe: Made the historical pipe split auto dropped after completion
(#17295)
* snapshot
* may-comp
* auto
(cherry picked from commit 9bfe0b0a78eec2f55b0d3f5388669f488b0e4a4b)
---
.../pipe/it/autocreate/IoTDBPipeAutoDropIT.java | 92 ++++++++++++----------
.../pipe/it/autocreate/IoTDBPipeAutoSplitIT.java | 55 +++++++++++--
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 7 +-
.../config/executor/ClusterConfigTaskExecutor.java | 62 ++++++++-------
4 files changed, 138 insertions(+), 78 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java
index dc1d1fc93ae..09657e2deb2 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java
@@ -21,8 +21,10 @@ package org.apache.iotdb.pipe.it.autocreate;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
@@ -33,59 +35,67 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.await;
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2AutoCreateSchema.class})
public class IoTDBPipeAutoDropIT extends AbstractPipeDualAutoIT {
+ @Override
+ protected void setupConfig() {
+ super.setupConfig();
+ senderEnv.getConfig().getCommonConfig().setPipeAutoSplitFullEnabled(true);
+
receiverEnv.getConfig().getCommonConfig().setPipeAutoSplitFullEnabled(true);
+
senderEnv.getConfig().getConfigNodeConfig().setLeaderDistributionPolicy("HASH");
+ }
+
@Test
public void testAutoDropInHistoricalTransfer() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
- final String receiverIp = receiverDataNode.getIp();
- final int receiverPort = receiverDataNode.getPort();
-
- try (final SyncConfigNodeIServiceClient client =
- (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
-
- TestUtils.executeNonQuery(senderEnv, "insert into root.db.d1(time,s1)
values (1,1)", null);
-
- final Map<String, String> extractorAttributes = new HashMap<>();
- final Map<String, String> processorAttributes = new HashMap<>();
- final Map<String, String> connectorAttributes = new HashMap<>();
-
- extractorAttributes.put("extractor.mode", "query");
-
- connectorAttributes.put("connector", "iotdb-thrift-connector");
- connectorAttributes.put("connector.batch.enable", "false");
- connectorAttributes.put("connector.ip", receiverIp);
- connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
-
- final TSStatus status =
- client.createPipe(
- new TCreatePipeReq("p1", connectorAttributes)
- .setExtractorAttributes(extractorAttributes)
- .setProcessorAttributes(processorAttributes));
-
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
-
- TestUtils.assertDataEventuallyOnEnv(
- receiverEnv,
- "select count(*) from root.**",
- "count(root.db.d1.s1),",
- Collections.singleton("1,"));
-
- TestUtils.assertDataEventuallyOnEnv(
- senderEnv,
- "show pipes",
-
"ID,CreationTime,State,PipeSource,PipeProcessor,PipeSink,ExceptionMessage,RemainingEventCount,EstimatedRemainingSeconds,",
- Collections.emptySet());
- }
+ TestUtils.executeNonQuery(
+ senderEnv,
+ String.format(
+ "create pipe a2b with sink ('node-urls'='%s')",
receiverDataNode.getIpAndPortString()),
+ null);
+
+ TestUtils.executeNonQuery(senderEnv, "insert into root.db.d1(time,s1)
values (1,1)", null);
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "select count(*) from root.**",
+ "count(root.db.d1.s1),",
+ Collections.singleton("1,"));
+
+ await()
+ .pollInSameThread()
+ .pollDelay(1L, TimeUnit.SECONDS)
+ .pollInterval(1L, TimeUnit.SECONDS)
+ .atMost(600, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement();
+ final ResultSet result = statement.executeQuery("show
pipes")) {
+ int pipeNum = 0;
+ while (result.next()) {
+ final String pipeName =
result.getString(ColumnHeaderConstant.ID);
+ if (!pipeName.contains(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)
+ && pipeName.endsWith("_history")) {
+ pipeNum++;
+ }
+ }
+ Assert.assertEquals(0, pipeNum);
+ }
+ });
}
@Test
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoSplitIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoSplitIT.java
index 5f732b6d390..ced1e9ee13c 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoSplitIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoSplitIT.java
@@ -23,6 +23,7 @@ import
org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.it.env.MultiEnvFactory;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
@@ -37,6 +38,8 @@ import org.junit.runner.RunWith;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Objects;
@@ -78,13 +81,12 @@ public class IoTDBPipeAutoSplitIT extends
AbstractPipeDualAutoIT {
public void testSingleEnv() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
- final String sql =
- String.format(
- "create pipe a2b with source ('source'='iotdb-source') with
processor ('processor'='do-nothing-processor') with sink ('node-urls'='%s')",
- receiverDataNode.getIpAndPortString());
try (final Connection connection = senderEnv.getConnection();
final Statement statement = connection.createStatement()) {
- statement.execute(sql);
+ statement.execute(
+ String.format(
+ "create pipe a2b with sink ('node-urls'='%s')",
+ receiverDataNode.getIpAndPortString()));
} catch (final SQLException e) {
fail(e.getMessage());
}
@@ -100,5 +102,48 @@ public class IoTDBPipeAutoSplitIT extends
AbstractPipeDualAutoIT {
|| (Objects.equals(showPipeResult.get(1).id, "a2b_history")
&& Objects.equals(showPipeResult.get(0).id,
"a2b_realtime")));
}
+
+ // Do not split for pipes without insertion or non-full
+ TestUtils.executeNonQueries(
+ senderEnv,
+ Arrays.asList(
+ "drop pipe a2b_history",
+ "drop pipe a2b_realtime",
+ String.format(
+ "create pipe a2b1 with source ('inclusion'='schema') with sink
('node-urls'='%s')",
+ receiverDataNode.getIpAndPortString()),
+ String.format(
+ "create pipe a2b2 with source ('realtime.enable'='false') with
sink ('node-urls'='%s')",
+ receiverDataNode.getIpAndPortString()),
+ String.format(
+ "create pipe a2b3 with source ('history.enable'='false') with
sink ('node-urls'='%s')",
+ receiverDataNode.getIpAndPortString())),
+ null);
+
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
+ Assert.assertEquals(3, showPipeResult.size());
+ }
+
+ TestUtils.executeNonQueries(
+ senderEnv,
+ Arrays.asList(
+ "drop pipe a2b1",
+ "drop pipe a2b2",
+ "drop pipe a2b3",
+ "insert into root.test.device(time, field) values(0,1),(1,2)",
+ "delete from root.test.device.* where time == 0",
+ String.format(
+ "create pipe a2b with source ('inclusion'='all') with sink
('node-urls'='%s')",
+ receiverDataNode.getIpAndPortString())),
+ null);
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "select * from root.test.device",
+ "Time,root.test.device.field,",
+ Collections.singleton("1,2.0,"));
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 9038e7c3a71..67b9460c15a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -586,7 +586,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
}
}
- public boolean isFullSync(final PipeParameters parameters) {
+ public boolean isFullSync(final PipeParameters parameters) throws
IllegalPathException {
if (isSnapshotMode(parameters)) {
return false;
}
@@ -600,7 +600,10 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY,
SOURCE_REALTIME_ENABLE_KEY),
EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE);
- return isHistoryEnable && isRealtimeEnable;
+ return isHistoryEnable
+ && isRealtimeEnable
+ &&
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(parameters)
+ .getLeft();
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index d582832864a..99eee84964e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -140,6 +140,7 @@ import
org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
+import org.apache.iotdb.db.pipe.source.dataregion.DataRegionListeningFilter;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
@@ -1826,10 +1827,10 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
new PipeParameters(createPipeStatement.getSourceAttributes());
final PipeParameters sinkPipeParameters =
new PipeParameters(createPipeStatement.getSinkAttributes());
- if (PipeConfig.getInstance().getPipeAutoSplitFullEnabled()
- && PipeDataNodeAgent.task().isFullSync(sourcePipeParameters)) {
- try (final ConfigNodeClient configNodeClient =
-
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+ try (final ConfigNodeClient configNodeClient =
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+ if (PipeConfig.getInstance().getPipeAutoSplitFullEnabled()
+ && PipeDataNodeAgent.task().isFullSync(sourcePipeParameters)) {
// 1. Send request to create the real-time data synchronization
pipeline
final TCreatePipeReq realtimeReq =
new TCreatePipeReq()
@@ -1874,11 +1875,19 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
Boolean.toString(false),
PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY,
Boolean.toString(true),
- // We force the historical pipe to
transfer data only
+ PipeSourceConstant.EXTRACTOR_MODE_KEY,
+
PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE,
+ // We force the historical pipe to
transfer data (and maybe
+ // deletion) only
// Thus we can transfer schema only once
// And may drop the historical pipe on
successfully transferred
PipeSourceConstant.SOURCE_INCLUSION_KEY,
-
PipeSourceConstant.EXTRACTOR_INCLUSION_DEFAULT_VALUE,
+ DataRegionListeningFilter
+
.parseInsertionDeletionListeningOptionPair(
+ sourcePipeParameters)
+ .getRight()
+ ? "data"
+ :
PipeSourceConstant.EXTRACTOR_INCLUSION_DEFAULT_VALUE,
PipeSourceConstant.SOURCE_EXCLUSION_KEY,
PipeSourceConstant.EXTRACTOR_EXCLUSION_DEFAULT_VALUE)))
.getAttribute())
@@ -1901,31 +1910,24 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
// 3. Set success status only if both pipelines are created
successfully
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
- } catch (final Exception e) {
- // Catch any other exceptions (e.g., network issues)
- future.setException(e);
- }
- return future;
- }
-
- try (final ConfigNodeClient configNodeClient =
-
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
- TCreatePipeReq req =
- new TCreatePipeReq()
- .setPipeName(pipeName)
-
.setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition())
-
.setExtractorAttributes(createPipeStatement.getSourceAttributes())
-
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
- .setConnectorAttributes(createPipeStatement.getSinkAttributes());
- TSStatus tsStatus = configNodeClient.createPipe(req);
- if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
- LOGGER.warn(
- "Failed to create pipe {} in config node, status is {}.",
- createPipeStatement.getPipeName(),
- tsStatus);
- future.setException(new IoTDBException(tsStatus));
} else {
- future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ final TCreatePipeReq req =
+ new TCreatePipeReq()
+ .setPipeName(pipeName)
+
.setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition())
+
.setExtractorAttributes(createPipeStatement.getSourceAttributes())
+
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
+
.setConnectorAttributes(createPipeStatement.getSinkAttributes());
+ final TSStatus tsStatus = configNodeClient.createPipe(req);
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode())
{
+ LOGGER.warn(
+ "Failed to create pipe {} in config node, status is {}.",
+ createPipeStatement.getPipeName(),
+ tsStatus);
+ future.setException(new IoTDBException(tsStatus));
+ } else {
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ }
}
} catch (final Exception e) {
future.setException(e);