This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 355a872cfae Pipe: Fixed the bug that separated historical pipe may not
include mod on deletion & The pipe without data.insert may be wrongly separated
by pipe and transfer data (#17346)
355a872cfae is described below
commit 355a872cfaeb77e1e7fe68d78a90544b3054d3a5
Author: Caideyipi <[email protected]>
AuthorDate: Wed Mar 25 17:44:39 2026 +0800
Pipe: Fixed the bug that separated historical pipe may not include mod on
deletion & The pipe without data.insert may be wrongly separated by pipe and
transfer data (#17346)
* fix
* fix
* f
---
.../treemodel/auto/basic/IoTDBPipeAutoSplitIT.java | 54 ++++++++++++++++++++--
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 7 ++-
.../config/executor/ClusterConfigTaskExecutor.java | 52 ++++++++++-----------
3 files changed, 80 insertions(+), 33 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java
index 31308f1c0e1..2ebad93348c 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java
@@ -23,6 +23,7 @@ import
org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.isession.SessionConfig;
import org.apache.iotdb.it.env.MultiEnvFactory;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
@@ -39,6 +40,8 @@ import org.junit.runner.RunWith;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Objects;
@@ -81,13 +84,12 @@ public class IoTDBPipeAutoSplitIT extends
AbstractPipeDualTreeModelAutoIT {
public void testSingleEnv() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
- final String sql =
- String.format(
- "create pipe a2b with source ('source'='iotdb-source') with
processor ('processor'='do-nothing-processor') with sink ('node-urls'='%s')",
- receiverDataNode.getIpAndPortString());
try (final Connection connection = senderEnv.getConnection();
final Statement statement = connection.createStatement()) {
- statement.execute(sql);
+ statement.execute(
+ String.format(
+ "create pipe a2b with sink ('node-urls'='%s')",
+ receiverDataNode.getIpAndPortString()));
} catch (final SQLException e) {
fail(e.getMessage());
}
@@ -104,5 +106,47 @@ public class IoTDBPipeAutoSplitIT extends
AbstractPipeDualTreeModelAutoIT {
|| (Objects.equals(showPipeResult.get(1).id, "a2b_history")
&& Objects.equals(showPipeResult.get(0).id,
"a2b_realtime")));
}
+
+ // Do not split for pipes without insertion or non-full
+ TestUtils.executeNonQueries(
+ senderEnv,
+ Arrays.asList(
+ "drop pipe a2b_history",
+ "drop pipe a2b_realtime",
+ String.format(
+ "create pipe a2b1 with source ('inclusion'='schema') with sink
('node-urls'='%s')",
+ receiverDataNode.getIpAndPortString()),
+ String.format(
+ "create pipe a2b2 with source ('realtime.enable'='false') with
sink ('node-urls'='%s')",
+ receiverDataNode.getIpAndPortString()),
+ String.format(
+ "create pipe a2b3 with source ('history.enable'='false') with
sink ('node-urls'='%s')",
+ receiverDataNode.getIpAndPortString())));
+
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
+ showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
+ Assert.assertEquals(3, showPipeResult.size());
+ }
+
+ TestUtils.executeNonQueries(
+ senderEnv,
+ Arrays.asList(
+ "drop pipe a2b1",
+ "drop pipe a2b2",
+ "drop pipe a2b3",
+ "insert into root.test.device(time, field) values(0,1),(1,2)",
+ "delete from root.test.device.* where time == 0",
+ String.format(
+ "create pipe a2b with source ('inclusion'='all') with sink
('node-urls'='%s')",
+ receiverDataNode.getIpAndPortString())));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "select * from root.test.device",
+ "Time,root.test.device.field,",
+ Collections.singleton("1,2.0,"));
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 0f8b9446d60..ea12513d647 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -584,7 +584,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
}
}
- public boolean isFullSync(final PipeParameters parameters) {
+ public boolean isFullSync(final PipeParameters parameters) throws
IllegalPathException {
if (isSnapshotMode(parameters)) {
return false;
}
@@ -598,7 +598,10 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY,
SOURCE_REALTIME_ENABLE_KEY),
EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE);
- return isHistoryEnable && isRealtimeEnable;
+ return isHistoryEnable
+ && isRealtimeEnable
+ &&
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(parameters)
+ .getLeft();
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index d28edddb2f9..d0627351b27 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -181,6 +181,7 @@ import
org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
+import org.apache.iotdb.db.pipe.source.dataregion.DataRegionListeningFilter;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
@@ -2205,10 +2206,10 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
new PipeParameters(createPipeStatement.getSourceAttributes());
final PipeParameters sinkPipeParameters =
new PipeParameters(createPipeStatement.getSinkAttributes());
- if (PipeConfig.getInstance().getPipeAutoSplitFullEnabled()
- && PipeDataNodeAgent.task().isFullSync(sourcePipeParameters)) {
- try (final ConfigNodeClient configNodeClient =
-
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+ try (final ConfigNodeClient configNodeClient =
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+ if (PipeConfig.getInstance().getPipeAutoSplitFullEnabled()
+ && PipeDataNodeAgent.task().isFullSync(sourcePipeParameters)) {
// 1. Send request to create the real-time data synchronization
pipeline
final TCreatePipeReq realtimeReq =
new TCreatePipeReq()
@@ -2253,11 +2254,17 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
Boolean.toString(false),
PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY,
Boolean.toString(true),
- // We force the historical pipe to
transfer data only
+ // We force the historical pipe to
transfer data (and maybe
+ // deletion) only
// Thus we can transfer schema only once
// And may drop the historical pipe on
successfully transferred
PipeSourceConstant.SOURCE_INCLUSION_KEY,
-
PipeSourceConstant.EXTRACTOR_INCLUSION_DEFAULT_VALUE,
+ DataRegionListeningFilter
+
.parseInsertionDeletionListeningOptionPair(
+ sourcePipeParameters)
+ .getRight()
+ ? "data"
+ :
PipeSourceConstant.EXTRACTOR_INCLUSION_DEFAULT_VALUE,
PipeSourceConstant.SOURCE_EXCLUSION_KEY,
PipeSourceConstant.EXTRACTOR_EXCLUSION_DEFAULT_VALUE)))
.getAttribute())
@@ -2280,27 +2287,20 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
// 3. Set success status only if both pipelines are created
successfully
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
- } catch (final Exception e) {
- // Catch any other exceptions (e.g., network issues)
- future.setException(e);
- }
- return future;
- }
-
- try (final ConfigNodeClient configNodeClient =
-
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
- final TCreatePipeReq req =
- new TCreatePipeReq()
- .setPipeName(pipeName)
-
.setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition())
-
.setExtractorAttributes(createPipeStatement.getSourceAttributes())
-
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
- .setConnectorAttributes(createPipeStatement.getSinkAttributes());
- final TSStatus tsStatus = configNodeClient.createPipe(req);
- if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
- future.setException(new IoTDBException(tsStatus));
} else {
- future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ final TCreatePipeReq req =
+ new TCreatePipeReq()
+ .setPipeName(pipeName)
+
.setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition())
+
.setExtractorAttributes(createPipeStatement.getSourceAttributes())
+
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
+
.setConnectorAttributes(createPipeStatement.getSinkAttributes());
+ final TSStatus tsStatus = configNodeClient.createPipe(req);
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode())
{
+ future.setException(new IoTDBException(tsStatus));
+ } else {
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ }
}
} catch (final Exception e) {
future.setException(e);