This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6a7665a0b2c Pipe: split full-sync pipe into history and realtime pipes
(#16250)
6a7665a0b2c is described below
commit 6a7665a0b2c3612b13ed6dcd9111d5e956ebd326
Author: VGalaxies <[email protected]>
AuthorDate: Fri Aug 29 09:53:58 2025 +0800
Pipe: split full-sync pipe into history and realtime pipes (#16250)
* setup
* minor improve
* apply Copilot review
* add internal config
* fixup
* fixup
* add IoTDBPipeAutoSplitIT
* set if not exists always to true to handle partial failure
---
.../it/env/cluster/config/MppCommonConfig.java | 6 ++
.../env/cluster/config/MppSharedCommonConfig.java | 7 ++
.../it/env/remote/config/RemoteCommonConfig.java | 5 +
.../org/apache/iotdb/itbase/env/CommonConfig.java | 2 +
.../manual/AbstractPipeTableModelDualManualIT.java | 6 +-
.../manual/basic/IoTDBPipePermissionIT.java | 6 +-
.../tablemodel/manual/basic/IoTDBPipeSourceIT.java | 6 +-
.../enhanced/IoTDBPipeSinkCompressionIT.java | 6 +-
.../auto/AbstractPipeDualTreeModelAutoIT.java | 6 +-
.../treemodel/auto/basic/IoTDBPipeAutoSplitIT.java | 106 +++++++++++++++++++++
.../treemodel/auto/basic/IoTDBPipeSourceIT.java | 6 +-
.../auto/enhanced/IoTDBPipeSinkCompressionIT.java | 6 +-
.../manual/AbstractPipeDualTreeModelManualIT.java | 6 +-
.../treemodel/manual/IoTDBPipePermissionIT.java | 6 +-
.../iotdb/pipe/it/single/AbstractPipeSingleIT.java | 3 +-
.../relational/it/schema/IoTDBDatabaseIT.java | 12 ++-
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 17 ++++
.../config/executor/ClusterConfigTaskExecutor.java | 74 ++++++++++++++
.../apache/iotdb/commons/conf/CommonConfig.java | 10 ++
.../iotdb/commons/pipe/config/PipeConfig.java | 8 ++
.../iotdb/commons/pipe/config/PipeDescriptor.java | 6 ++
21 files changed, 289 insertions(+), 21 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
index 8a0f6c483dd..818d674a9ea 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
@@ -538,6 +538,12 @@ public class MppCommonConfig extends MppBaseConfig
implements CommonConfig {
return this;
}
+ @Override
+ public CommonConfig setPipeAutoSplitFullEnabled(boolean
pipeAutoSplitFullEnabled) {
+ setProperty("pipe_auto_split_full_enabled",
String.valueOf(pipeAutoSplitFullEnabled));
+ return this;
+ }
+
@Override
public CommonConfig setQueryMemoryProportion(String queryMemoryProportion) {
setProperty("chunk_timeseriesmeta_free_memory_proportion",
queryMemoryProportion);
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
index b58d5c132e8..002057ab3d7 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
@@ -554,6 +554,13 @@ public class MppSharedCommonConfig implements CommonConfig
{
return this;
}
+ @Override
+ public CommonConfig setPipeAutoSplitFullEnabled(boolean
pipeAutoSplitFullEnabled) {
+ dnConfig.setPipeAutoSplitFullEnabled(pipeAutoSplitFullEnabled);
+ cnConfig.setPipeAutoSplitFullEnabled(pipeAutoSplitFullEnabled);
+ return this;
+ }
+
@Override
public CommonConfig setQueryMemoryProportion(String queryMemoryProportion) {
dnConfig.setQueryMemoryProportion(queryMemoryProportion);
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
index f18828f5afc..eff6c61deae 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
@@ -391,6 +391,11 @@ public class RemoteCommonConfig implements CommonConfig {
return this;
}
+ @Override
+ public CommonConfig setPipeAutoSplitFullEnabled(boolean
pipeAutoSplitFullEnabled) {
+ return this;
+ }
+
@Override
public CommonConfig setQueryMemoryProportion(String queryMemoryProportion) {
return this;
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index d4cc7c2e1ed..ce4a57a807d 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -172,6 +172,8 @@ public interface CommonConfig {
CommonConfig setPipeConnectorRequestSliceThresholdBytes(
int pipeConnectorRequestSliceThresholdBytes);
+ CommonConfig setPipeAutoSplitFullEnabled(boolean pipeAutoSplitFullEnabled);
+
CommonConfig setQueryMemoryProportion(String queryMemoryProportion);
CommonConfig setDataNodeMemoryProportion(String dataNodeMemoryProportion);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java
index 49b5c6d368a..55c0170aacd 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java
@@ -51,7 +51,8 @@ public abstract class AbstractPipeTableModelDualManualIT {
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setEnforceStrongPassword(false)
.setPipeMemoryManagementEnabled(false)
- .setIsPipeEnableMemoryCheck(false);
+ .setIsPipeEnableMemoryCheck(false)
+ .setPipeAutoSplitFullEnabled(false);
receiverEnv
.getConfig()
.getCommonConfig()
@@ -60,7 +61,8 @@ public abstract class AbstractPipeTableModelDualManualIT {
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setEnforceStrongPassword(false)
.setPipeMemoryManagementEnabled(false)
- .setIsPipeEnableMemoryCheck(false);
+ .setIsPipeEnableMemoryCheck(false)
+ .setPipeAutoSplitFullEnabled(false);
// 10 min, assert that the operations will not time out
senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java
index 2275ddf505a..803880202d1 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java
@@ -71,7 +71,8 @@ public class IoTDBPipePermissionIT extends
AbstractPipeTableModelDualManualIT {
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setDnConnectionTimeoutMs(600000)
.setPipeMemoryManagementEnabled(false)
- .setIsPipeEnableMemoryCheck(false);
+ .setIsPipeEnableMemoryCheck(false)
+ .setPipeAutoSplitFullEnabled(false);
receiverEnv
.getConfig()
.getCommonConfig()
@@ -86,7 +87,8 @@ public class IoTDBPipePermissionIT extends
AbstractPipeTableModelDualManualIT {
.setDataReplicationFactor(2)
.setDnConnectionTimeoutMs(600000)
.setPipeMemoryManagementEnabled(false)
- .setIsPipeEnableMemoryCheck(false);
+ .setIsPipeEnableMemoryCheck(false)
+ .setPipeAutoSplitFullEnabled(false);
senderEnv.initClusterEnvironment();
receiverEnv.initClusterEnvironment(3, 3);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSourceIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSourceIT.java
index a43e9f11c6f..d4cdf521304 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSourceIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSourceIT.java
@@ -74,7 +74,8 @@ public class IoTDBPipeSourceIT extends
AbstractPipeTableModelDualManualIT {
.setEnableCrossSpaceCompaction(false)
.setDnConnectionTimeoutMs(600000)
.setPipeMemoryManagementEnabled(false)
- .setIsPipeEnableMemoryCheck(false);
+ .setIsPipeEnableMemoryCheck(false)
+ .setPipeAutoSplitFullEnabled(false);
senderEnv.getConfig().getConfigNodeConfig().setLeaderDistributionPolicy("HASH");
receiverEnv
.getConfig()
@@ -84,7 +85,8 @@ public class IoTDBPipeSourceIT extends
AbstractPipeTableModelDualManualIT {
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setDnConnectionTimeoutMs(600000)
.setPipeMemoryManagementEnabled(false)
- .setIsPipeEnableMemoryCheck(false);
+ .setIsPipeEnableMemoryCheck(false)
+ .setPipeAutoSplitFullEnabled(false);
senderEnv.initClusterEnvironment();
receiverEnv.initClusterEnvironment();
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkCompressionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkCompressionIT.java
index d1056effae3..74c43b8df34 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkCompressionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkCompressionIT.java
@@ -74,7 +74,8 @@ public class IoTDBPipeSinkCompressionIT extends
AbstractPipeTableModelDualManual
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setDnConnectionTimeoutMs(600000)
.setPipeMemoryManagementEnabled(false)
- .setIsPipeEnableMemoryCheck(false);
+ .setIsPipeEnableMemoryCheck(false)
+ .setPipeAutoSplitFullEnabled(false);
senderEnv.getConfig().getDataNodeConfig().setDataNodeMemoryProportion("3:3:1:1:3:1");
receiverEnv
.getConfig()
@@ -85,7 +86,8 @@ public class IoTDBPipeSinkCompressionIT extends
AbstractPipeTableModelDualManual
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setDnConnectionTimeoutMs(600000)
.setPipeMemoryManagementEnabled(false)
- .setIsPipeEnableMemoryCheck(false);
+ .setIsPipeEnableMemoryCheck(false)
+ .setPipeAutoSplitFullEnabled(false);
senderEnv.initClusterEnvironment();
receiverEnv.initClusterEnvironment();
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/AbstractPipeDualTreeModelAutoIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/AbstractPipeDualTreeModelAutoIT.java
index 9443256fac9..2e1cf5aff3d 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/AbstractPipeDualTreeModelAutoIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/AbstractPipeDualTreeModelAutoIT.java
@@ -59,7 +59,8 @@ public abstract class AbstractPipeDualTreeModelAutoIT {
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setEnforceStrongPassword(false)
.setPipeMemoryManagementEnabled(false)
- .setIsPipeEnableMemoryCheck(false);
+ .setIsPipeEnableMemoryCheck(false)
+ .setPipeAutoSplitFullEnabled(false);
senderEnv.getConfig().getDataNodeConfig().setDataNodeMemoryProportion("3:3:1:1:3:1");
receiverEnv
.getConfig()
@@ -68,7 +69,8 @@ public abstract class AbstractPipeDualTreeModelAutoIT {
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setPipeMemoryManagementEnabled(false)
- .setIsPipeEnableMemoryCheck(false);
+ .setIsPipeEnableMemoryCheck(false)
+ .setPipeAutoSplitFullEnabled(false);
receiverEnv.getConfig().getDataNodeConfig().setDataNodeMemoryProportion("3:3:1:1:9:1");
// 10 min, assert that the operations will not time out
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java
new file mode 100644
index 00000000000..4cc50e97323
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.dual.treemodel.auto.basic;
+
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.it.env.MultiEnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2DualTreeAutoBasic;
+import
org.apache.iotdb.pipe.it.dual.treemodel.auto.AbstractPipeDualTreeModelAutoIT;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Objects;
+
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2DualTreeAutoBasic.class})
+public class IoTDBPipeAutoSplitIT extends AbstractPipeDualTreeModelAutoIT {
+
+ @Override
+ @Before
+ public void setUp() {
+ MultiEnvFactory.createEnv(2);
+ senderEnv = MultiEnvFactory.getEnv(0);
+ receiverEnv = MultiEnvFactory.getEnv(1);
+ senderEnv
+ .getConfig()
+ .getCommonConfig()
+ .setAutoCreateSchemaEnabled(true)
+ .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+ .setEnforceStrongPassword(false)
+ .setPipeMemoryManagementEnabled(false)
+ .setIsPipeEnableMemoryCheck(false)
+ .setPipeAutoSplitFullEnabled(true); // set auto split to true
+ receiverEnv
+ .getConfig()
+ .getCommonConfig()
+ .setAutoCreateSchemaEnabled(true)
+ .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+ .setPipeMemoryManagementEnabled(false)
+ .setIsPipeEnableMemoryCheck(false)
+ .setPipeAutoSplitFullEnabled(true); // set auto split to true
+ senderEnv.initClusterEnvironment();
+ receiverEnv.initClusterEnvironment();
+ }
+
+ @Test
+ public void testSingleEnv() throws Exception {
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ final String sql =
+ String.format(
+ "create pipe a2b with source ('source'='iotdb-source') with
processor ('processor'='do-nothing-processor') with sink ('node-urls'='%s')",
+ receiverDataNode.getIpAndPortString());
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(sql);
+ } catch (final SQLException e) {
+ fail(e.getMessage());
+ }
+
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
+ Assert.assertEquals(2, showPipeResult.size());
+ Assert.assertTrue(
+ (Objects.equals(showPipeResult.get(0).id, "a2b_history")
+ && Objects.equals(showPipeResult.get(1).id, "a2b_realtime"))
+ || (Objects.equals(showPipeResult.get(1).id, "a2b_history")
+ && Objects.equals(showPipeResult.get(0).id,
"a2b_realtime")));
+ }
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
index caad46ea625..6f95d3d1f6c 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
@@ -74,7 +74,8 @@ public class IoTDBPipeSourceIT extends
AbstractPipeDualTreeModelAutoIT {
.setEnableUnseqSpaceCompaction(false)
.setEnableCrossSpaceCompaction(false)
.setPipeMemoryManagementEnabled(false)
- .setIsPipeEnableMemoryCheck(false);
+ .setIsPipeEnableMemoryCheck(false)
+ .setPipeAutoSplitFullEnabled(false);
senderEnv.getConfig().getConfigNodeConfig().setLeaderDistributionPolicy("HASH");
receiverEnv
.getConfig()
@@ -83,7 +84,8 @@ public class IoTDBPipeSourceIT extends
AbstractPipeDualTreeModelAutoIT {
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setPipeMemoryManagementEnabled(false)
- .setIsPipeEnableMemoryCheck(false);
+ .setIsPipeEnableMemoryCheck(false)
+ .setPipeAutoSplitFullEnabled(false);
receiverEnv.getConfig().getConfigNodeConfig().setLeaderDistributionPolicy("HASH");
// 10 min, assert that the operations will not time out
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java
index 1ac37a6c768..40d8c31725f 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java
@@ -71,7 +71,8 @@ public class IoTDBPipeSinkCompressionIT extends
AbstractPipeDualTreeModelAutoIT
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setPipeMemoryManagementEnabled(false)
- .setIsPipeEnableMemoryCheck(false);
+ .setIsPipeEnableMemoryCheck(false)
+ .setPipeAutoSplitFullEnabled(false);
receiverEnv
.getConfig()
@@ -81,7 +82,8 @@ public class IoTDBPipeSinkCompressionIT extends
AbstractPipeDualTreeModelAutoIT
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setPipeMemoryManagementEnabled(false)
- .setIsPipeEnableMemoryCheck(false);
+ .setIsPipeEnableMemoryCheck(false)
+ .setPipeAutoSplitFullEnabled(false);
// 10 min, assert that the operations will not time out
senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/AbstractPipeDualTreeModelManualIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/AbstractPipeDualTreeModelManualIT.java
index d18a3340eb4..b704125ca30 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/AbstractPipeDualTreeModelManualIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/AbstractPipeDualTreeModelManualIT.java
@@ -58,7 +58,8 @@ public abstract class AbstractPipeDualTreeModelManualIT {
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setPipeMemoryManagementEnabled(false)
- .setIsPipeEnableMemoryCheck(false);
+ .setIsPipeEnableMemoryCheck(false)
+ .setPipeAutoSplitFullEnabled(false);
senderEnv.getConfig().getDataNodeConfig().setDataNodeMemoryProportion("3:3:1:1:3:1");
receiverEnv
@@ -68,7 +69,8 @@ public abstract class AbstractPipeDualTreeModelManualIT {
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setPipeMemoryManagementEnabled(false)
- .setIsPipeEnableMemoryCheck(false);
+ .setIsPipeEnableMemoryCheck(false)
+ .setPipeAutoSplitFullEnabled(false);
// 10 min, assert that the operations will not time out
senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
index bdd9b850c8a..bc2f2558b64 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
@@ -66,7 +66,8 @@ public class IoTDBPipePermissionIT extends
AbstractPipeDualTreeModelManualIT {
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setDnConnectionTimeoutMs(600000)
.setPipeMemoryManagementEnabled(false)
- .setIsPipeEnableMemoryCheck(false);
+ .setIsPipeEnableMemoryCheck(false)
+ .setPipeAutoSplitFullEnabled(false);
senderEnv.getConfig().getDataNodeConfig().setDataNodeMemoryProportion("3:3:1:1:3:1");
receiverEnv
.getConfig()
@@ -80,7 +81,8 @@ public class IoTDBPipePermissionIT extends
AbstractPipeDualTreeModelManualIT {
.setDataReplicationFactor(2)
.setDnConnectionTimeoutMs(600000)
.setPipeMemoryManagementEnabled(false)
- .setIsPipeEnableMemoryCheck(false);
+ .setIsPipeEnableMemoryCheck(false)
+ .setPipeAutoSplitFullEnabled(false);
senderEnv.initClusterEnvironment();
receiverEnv.initClusterEnvironment(3, 3);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java
index 96c6a801ffd..3bb3ecce21e 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java
@@ -37,7 +37,8 @@ abstract class AbstractPipeSingleIT {
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
.setPipeMemoryManagementEnabled(false)
- .setIsPipeEnableMemoryCheck(false);
+ .setIsPipeEnableMemoryCheck(false)
+ .setPipeAutoSplitFullEnabled(false);
env.initClusterEnvironment();
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java
index e9df4cf853b..a9736613e72 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java
@@ -56,9 +56,17 @@ public class IoTDBDatabaseIT {
@Before
public void setUp() throws Exception {
-
EnvFactory.getEnv().getConfig().getCommonConfig().setEnforceStrongPassword(false);
+ EnvFactory.getEnv()
+ .getConfig()
+ .getCommonConfig()
+ .setEnforceStrongPassword(false)
+ .setPipeAutoSplitFullEnabled(false);
// enable subscription
-
EnvFactory.getEnv().getConfig().getCommonConfig().setSubscriptionEnabled(true);
+ EnvFactory.getEnv()
+ .getConfig()
+ .getCommonConfig()
+ .setSubscriptionEnabled(true)
+ .setPipeAutoSplitFullEnabled(false);
EnvFactory.getEnv().initClusterEnvironment();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 4d3644aabc9..7e881401838 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -587,6 +587,23 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
}
}
+ public boolean isFullSync(final PipeParameters parameters) {
+ if (isSnapshotMode(parameters)) {
+ return false;
+ }
+
+ final boolean isHistoryEnable =
+ parameters.getBooleanOrDefault(
+ Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY,
SOURCE_HISTORY_ENABLE_KEY),
+ EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE);
+ final boolean isRealtimeEnable =
+ parameters.getBooleanOrDefault(
+ Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY,
SOURCE_REALTIME_ENABLE_KEY),
+ EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE);
+
+ return isHistoryEnable && isRealtimeEnable;
+ }
+
private boolean isSnapshotMode(final PipeParameters parameters) {
final boolean isSnapshotMode;
if (parameters.hasAnyAttributes(EXTRACTOR_MODE_SNAPSHOT_KEY,
SOURCE_MODE_SNAPSHOT_KEY)) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 4d3a607d61f..e44b1c0983e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -53,6 +53,7 @@ import
org.apache.iotdb.commons.pipe.agent.plugin.service.PipePluginClassLoader;
import
org.apache.iotdb.commons.pipe.agent.plugin.service.PipePluginExecutableManager;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
import org.apache.iotdb.commons.pipe.datastructure.visibility.Visibility;
@@ -314,6 +315,7 @@ import org.apache.iotdb.udf.api.relational.ScalarFunction;
import org.apache.iotdb.udf.api.relational.TableFunction;
import
org.apache.iotdb.udf.api.relational.table.specification.ParameterSpecification;
+import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.thrift.TException;
@@ -2007,6 +2009,78 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
return future;
}
+ // Syntactic sugar: if full-sync mode is detected (i.e. not snapshot mode,
or both realtime
+ // and history are true), the pipe is split into history-only and
realtime–only modes.
+ final PipeParameters extractorPipeParameters =
+ new PipeParameters(createPipeStatement.getExtractorAttributes());
+ if (PipeConfig.getInstance().getPipeAutoSplitFullEnabled()
+ && PipeDataNodeAgent.task().isFullSync(extractorPipeParameters)) {
+ try (final ConfigNodeClient configNodeClient =
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+ // 1. Send request to create the historical data synchronization
pipeline
+ final TCreatePipeReq historyReq =
+ new TCreatePipeReq()
+ // Append suffix to the pipeline name for historical data
+ .setPipeName(createPipeStatement.getPipeName() + "_history")
+ // NOTE: set if not exists always to true to handle partial
failure
+ .setIfNotExistsCondition(true)
+ // Use extractor parameters for historical data
+ .setExtractorAttributes(
+ extractorPipeParameters
+ .addOrReplaceEquivalentAttributesWithClone(
+ new PipeParameters(
+ ImmutableMap.of(
+
PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY,
+ Boolean.toString(true),
+
PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_KEY,
+ Boolean.toString(false))))
+ .getAttribute())
+
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
+
.setConnectorAttributes(createPipeStatement.getConnectorAttributes());
+
+ final TSStatus historyTsStatus =
configNodeClient.createPipe(historyReq);
+ // If creation fails, immediately return with exception
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() !=
historyTsStatus.getCode()) {
+ future.setException(new IoTDBException(historyTsStatus));
+ return future;
+ }
+
+ // 2. Send request to create the real-time data synchronization
pipeline
+ final TCreatePipeReq realtimeReq =
+ new TCreatePipeReq()
+ // Append suffix to the pipeline name for real-time data
+ .setPipeName(createPipeStatement.getPipeName() + "_realtime")
+
.setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition())
+ // Use extractor parameters for real-time data
+ .setExtractorAttributes(
+ extractorPipeParameters
+ .addOrReplaceEquivalentAttributesWithClone(
+ new PipeParameters(
+ ImmutableMap.of(
+
PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY,
+ Boolean.toString(false),
+
PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_KEY,
+ Boolean.toString(true))))
+ .getAttribute())
+
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
+
.setConnectorAttributes(createPipeStatement.getConnectorAttributes());
+
+ final TSStatus realtimeTsStatus =
configNodeClient.createPipe(realtimeReq);
+ // If creation fails, immediately return with exception
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() !=
realtimeTsStatus.getCode()) {
+ future.setException(new IoTDBException(realtimeTsStatus));
+ return future;
+ }
+
+ // 3. Set success status only if both pipelines are created
successfully
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ } catch (final Exception e) {
+ // Catch any other exceptions (e.g., network issues)
+ future.setException(e);
+ }
+ return future;
+ }
+
try (final ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final TCreatePipeReq req =
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 61212fbd5a2..c5d0ecc558f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -337,6 +337,8 @@ public class CommonConfig {
private boolean pipeEventReferenceTrackingEnabled = true;
private long pipeEventReferenceEliminateIntervalSeconds = 10;
+ private boolean pipeAutoSplitFullEnabled = true;
+
private boolean subscriptionEnabled = false;
private float subscriptionCacheMemoryUsagePercentage = 0.2F;
@@ -2083,6 +2085,14 @@ public class CommonConfig {
pipeEventReferenceEliminateIntervalSeconds);
}
+ public boolean getPipeAutoSplitFullEnabled() {
+ return pipeAutoSplitFullEnabled;
+ }
+
+ public void setPipeAutoSplitFullEnabled(boolean pipeAutoSplitFullEnabled) {
+ this.pipeAutoSplitFullEnabled = pipeAutoSplitFullEnabled;
+ }
+
public boolean getSubscriptionEnabled() {
return subscriptionEnabled;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 887d89b0279..21605ee70e2 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -427,6 +427,12 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeEventReferenceEliminateIntervalSeconds();
}
+ /////////////////////////////// Syntactic sugar
///////////////////////////////
+
+ public boolean getPipeAutoSplitFullEnabled() {
+ return COMMON_CONFIG.getPipeAutoSplitFullEnabled();
+ }
+
/////////////////////////////// Utils ///////////////////////////////
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeConfig.class);
@@ -622,6 +628,8 @@ public class PipeConfig {
LOGGER.info(
"PipeEventReferenceEliminateIntervalSeconds: {}",
getPipeEventReferenceEliminateIntervalSeconds());
+
+ LOGGER.info("PipeAutoSplitFullEnabled: {}", getPipeAutoSplitFullEnabled());
}
/////////////////////////////// Singleton ///////////////////////////////
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 9087db068ed..8009fa27dea 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -547,6 +547,12 @@ public class PipeDescriptor {
Long.parseLong(
properties.getProperty(
"pipe_max_wait_finish_time",
String.valueOf(config.getPipeMaxWaitFinishTime()))));
+
+ config.setPipeAutoSplitFullEnabled(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "pipe_auto_split_full_enabled",
+ String.valueOf(config.getPipeAutoSplitFullEnabled()))));
}
public static void loadPipeExternalConfig(